Imbalanced workload between workers

classic Classic list List threaded Threaded
24 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Imbalanced workload between workers

Pieter Hameete
Hi guys,

Currently I am running a job in the GCloud in a configuration with 4 task managers that each have 4 CPUs (for a total parallelism of 16). 

However, I noticed my job is running much slower than expected and after some more investigation I found that one of the workers is doing a majority of the work (its CPU load was at 100% while the others were almost idle). 

My job execution plan can be found here: http://i.imgur.com/fHKhVFf.png

The input is split into multiple files so loading the data is properly distributed over the workers.

I am wondering if you can provide me with some tips on how to figure out what is going wrong here:
  • Could this imbalance in workload be the result of an imbalance in the hash paritioning?
    • Is there a convenient way to see how many elements each worker gets to process? Would it work to write the output of the CoGroup to disk because each worker writes to its own output file and investigate the differences?
  • Is there something strange about the execution plan that could cause this?
Thanks and kind regards,

Pieter
Reply | Threaded
Open this post in threaded view
|

Re: Imbalanced workload between workers

Till Rohrmann
Could it be that your data is skewed? This could lead to different loads on different task managers.

With the latest Flink version, the web interface should show you how many bytes each operator has written and received. There you could see if one operator receives more elements than the others.

Cheers,
Till

On Wed, Jan 27, 2016 at 1:35 PM, Pieter Hameete <[hidden email]> wrote:
Hi guys,

Currently I am running a job in the GCloud in a configuration with 4 task managers that each have 4 CPUs (for a total parallelism of 16). 

However, I noticed my job is running much slower than expected and after some more investigation I found that one of the workers is doing a majority of the work (its CPU load was at 100% while the others were almost idle). 

My job execution plan can be found here: http://i.imgur.com/fHKhVFf.png

The input is split into multiple files so loading the data is properly distributed over the workers.

I am wondering if you can provide me with some tips on how to figure out what is going wrong here:
  • Could this imbalance in workload be the result of an imbalance in the hash paritioning?
    • Is there a convenient way to see how many elements each worker gets to process? Would it work to write the output of the CoGroup to disk because each worker writes to its own output file and investigate the differences?
  • Is there something strange about the execution plan that could cause this?
Thanks and kind regards,

Pieter

Reply | Threaded
Open this post in threaded view
|

Re: Imbalanced workload between workers

Pieter Hameete

Cheers for the quick reply Till.

That would be very useful information to have! I'll upgrade my project to Flink 0.10.1 tongiht and let you know if I can find out if theres a skew in the data :-)

- Pieter


2016-01-27 13:49 GMT+01:00 Till Rohrmann <[hidden email]>:
Could it be that your data is skewed? This could lead to different loads on different task managers.

With the latest Flink version, the web interface should show you how many bytes each operator has written and received. There you could see if one operator receives more elements than the others.

Cheers,
Till

On Wed, Jan 27, 2016 at 1:35 PM, Pieter Hameete <[hidden email]> wrote:
Hi guys,

Currently I am running a job in the GCloud in a configuration with 4 task managers that each have 4 CPUs (for a total parallelism of 16). 

However, I noticed my job is running much slower than expected and after some more investigation I found that one of the workers is doing a majority of the work (its CPU load was at 100% while the others were almost idle). 

My job execution plan can be found here: http://i.imgur.com/fHKhVFf.png

The input is split into multiple files so loading the data is properly distributed over the workers.

I am wondering if you can provide me with some tips on how to figure out what is going wrong here:
  • Could this imbalance in workload be the result of an imbalance in the hash paritioning?
    • Is there a convenient way to see how many elements each worker gets to process? Would it work to write the output of the CoGroup to disk because each worker writes to its own output file and investigate the differences?
  • Is there something strange about the execution plan that could cause this?
Thanks and kind regards,

Pieter


Reply | Threaded
Open this post in threaded view
|

Re: Imbalanced workload between workers

Pieter Hameete
Hi Till,

i've upgraded to Flink 0.10.1 and ran the job again without any changes to the code to see the bytes input and output of the operators and for the different workers.To my surprise it is very well balanced between all workers and because of this the job completed much faster.

Are there any changes/fixes between Flink 0.9.1 and 0.10.1 that could cause this to be better for me now?

Thanks,

Pieter

2016-01-27 14:10 GMT+01:00 Pieter Hameete <[hidden email]>:

Cheers for the quick reply Till.

That would be very useful information to have! I'll upgrade my project to Flink 0.10.1 tongiht and let you know if I can find out if theres a skew in the data :-)

- Pieter


2016-01-27 13:49 GMT+01:00 Till Rohrmann <[hidden email]>:
Could it be that your data is skewed? This could lead to different loads on different task managers.

With the latest Flink version, the web interface should show you how many bytes each operator has written and received. There you could see if one operator receives more elements than the others.

Cheers,
Till

On Wed, Jan 27, 2016 at 1:35 PM, Pieter Hameete <[hidden email]> wrote:
Hi guys,

Currently I am running a job in the GCloud in a configuration with 4 task managers that each have 4 CPUs (for a total parallelism of 16). 

However, I noticed my job is running much slower than expected and after some more investigation I found that one of the workers is doing a majority of the work (its CPU load was at 100% while the others were almost idle). 

My job execution plan can be found here: http://i.imgur.com/fHKhVFf.png

The input is split into multiple files so loading the data is properly distributed over the workers.

I am wondering if you can provide me with some tips on how to figure out what is going wrong here:
  • Could this imbalance in workload be the result of an imbalance in the hash paritioning?
    • Is there a convenient way to see how many elements each worker gets to process? Would it work to write the output of the CoGroup to disk because each worker writes to its own output file and investigate the differences?
  • Is there something strange about the execution plan that could cause this?
Thanks and kind regards,

Pieter



Reply | Threaded
Open this post in threaded view
|

Re: Imbalanced workload between workers

Stephan Ewen
Hi Pieter!

Interesting, but good :-)

I don't think we did much on the hash functions since 0.9.1. I am a bit surprised that it made such a difference. Well, as long as it improves with the newer version :-)

Greetings,
Stephan


On Wed, Jan 27, 2016 at 9:42 PM, Pieter Hameete <[hidden email]> wrote:
Hi Till,

i've upgraded to Flink 0.10.1 and ran the job again without any changes to the code to see the bytes input and output of the operators and for the different workers.To my surprise it is very well balanced between all workers and because of this the job completed much faster.

Are there any changes/fixes between Flink 0.9.1 and 0.10.1 that could cause this to be better for me now?

Thanks,

Pieter

2016-01-27 14:10 GMT+01:00 Pieter Hameete <[hidden email]>:

Cheers for the quick reply Till.

That would be very useful information to have! I'll upgrade my project to Flink 0.10.1 tongiht and let you know if I can find out if theres a skew in the data :-)

- Pieter


2016-01-27 13:49 GMT+01:00 Till Rohrmann <[hidden email]>:
Could it be that your data is skewed? This could lead to different loads on different task managers.

With the latest Flink version, the web interface should show you how many bytes each operator has written and received. There you could see if one operator receives more elements than the others.

Cheers,
Till

On Wed, Jan 27, 2016 at 1:35 PM, Pieter Hameete <[hidden email]> wrote:
Hi guys,

Currently I am running a job in the GCloud in a configuration with 4 task managers that each have 4 CPUs (for a total parallelism of 16). 

However, I noticed my job is running much slower than expected and after some more investigation I found that one of the workers is doing a majority of the work (its CPU load was at 100% while the others were almost idle). 

My job execution plan can be found here: http://i.imgur.com/fHKhVFf.png

The input is split into multiple files so loading the data is properly distributed over the workers.

I am wondering if you can provide me with some tips on how to figure out what is going wrong here:
  • Could this imbalance in workload be the result of an imbalance in the hash paritioning?
    • Is there a convenient way to see how many elements each worker gets to process? Would it work to write the output of the CoGroup to disk because each worker writes to its own output file and investigate the differences?
  • Is there something strange about the execution plan that could cause this?
Thanks and kind regards,

Pieter




Reply | Threaded
Open this post in threaded view
|

Re: Imbalanced workload between workers

Pieter Hameete
Hi Stephen, Till,

I've watched the Job again and please see the log of the CoGroup operator:

Inline afbeelding 1

All workers get to process a fairly distributed amount of bytes and records, BUT hadoop-w-0, hadoop-w-2 and hadoop-w-3 don't start working until hadoop-w-1 is finished. Is this behavior to be expected with a CoGroup or could there still be something wrong in the distrubtion of the data?

Kind regards,

Pieter

2016-01-27 21:48 GMT+01:00 Stephan Ewen <[hidden email]>:
Hi Pieter!

Interesting, but good :-)

I don't think we did much on the hash functions since 0.9.1. I am a bit surprised that it made such a difference. Well, as long as it improves with the newer version :-)

Greetings,
Stephan


On Wed, Jan 27, 2016 at 9:42 PM, Pieter Hameete <[hidden email]> wrote:
Hi Till,

i've upgraded to Flink 0.10.1 and ran the job again without any changes to the code to see the bytes input and output of the operators and for the different workers.To my surprise it is very well balanced between all workers and because of this the job completed much faster.

Are there any changes/fixes between Flink 0.9.1 and 0.10.1 that could cause this to be better for me now?

Thanks,

Pieter

2016-01-27 14:10 GMT+01:00 Pieter Hameete <[hidden email]>:

Cheers for the quick reply Till.

That would be very useful information to have! I'll upgrade my project to Flink 0.10.1 tongiht and let you know if I can find out if theres a skew in the data :-)

- Pieter


2016-01-27 13:49 GMT+01:00 Till Rohrmann <[hidden email]>:
Could it be that your data is skewed? This could lead to different loads on different task managers.

With the latest Flink version, the web interface should show you how many bytes each operator has written and received. There you could see if one operator receives more elements than the others.

Cheers,
Till

On Wed, Jan 27, 2016 at 1:35 PM, Pieter Hameete <[hidden email]> wrote:
Hi guys,

Currently I am running a job in the GCloud in a configuration with 4 task managers that each have 4 CPUs (for a total parallelism of 16). 

However, I noticed my job is running much slower than expected and after some more investigation I found that one of the workers is doing a majority of the work (its CPU load was at 100% while the others were almost idle). 

My job execution plan can be found here: http://i.imgur.com/fHKhVFf.png

The input is split into multiple files so loading the data is properly distributed over the workers.

I am wondering if you can provide me with some tips on how to figure out what is going wrong here:
  • Could this imbalance in workload be the result of an imbalance in the hash paritioning?
    • Is there a convenient way to see how many elements each worker gets to process? Would it work to write the output of the CoGroup to disk because each worker writes to its own output file and investigate the differences?
  • Is there something strange about the execution plan that could cause this?
Thanks and kind regards,

Pieter





Reply | Threaded
Open this post in threaded view
|

Re: Imbalanced workload between workers

Till Rohrmann
Hi Pieter,

you can see in the log that the operators are all started at the same time. However, you're right that they don't finish at the same time. The sub tasks which run on the same node exhibit a similar runtime. However, all nodes (not only hadoop-w-0 compared to the others) show different runtimes. I would guess that this is due to some other load on the GCloud machines or some other kind of asymmetry between the hosts.

Cheers,
Till

On Thu, Jan 28, 2016 at 10:17 AM, Pieter Hameete <[hidden email]> wrote:
Hi Stephen, Till,

I've watched the Job again and please see the log of the CoGroup operator:

Inline afbeelding 1

All workers get to process a fairly distributed amount of bytes and records, BUT hadoop-w-0, hadoop-w-2 and hadoop-w-3 don't start working until hadoop-w-1 is finished. Is this behavior to be expected with a CoGroup or could there still be something wrong in the distrubtion of the data?

Kind regards,

Pieter

2016-01-27 21:48 GMT+01:00 Stephan Ewen <[hidden email]>:
Hi Pieter!

Interesting, but good :-)

I don't think we did much on the hash functions since 0.9.1. I am a bit surprised that it made such a difference. Well, as long as it improves with the newer version :-)

Greetings,
Stephan


On Wed, Jan 27, 2016 at 9:42 PM, Pieter Hameete <[hidden email]> wrote:
Hi Till,

i've upgraded to Flink 0.10.1 and ran the job again without any changes to the code to see the bytes input and output of the operators and for the different workers.To my surprise it is very well balanced between all workers and because of this the job completed much faster.

Are there any changes/fixes between Flink 0.9.1 and 0.10.1 that could cause this to be better for me now?

Thanks,

Pieter

2016-01-27 14:10 GMT+01:00 Pieter Hameete <[hidden email]>:

Cheers for the quick reply Till.

That would be very useful information to have! I'll upgrade my project to Flink 0.10.1 tongiht and let you know if I can find out if theres a skew in the data :-)

- Pieter


2016-01-27 13:49 GMT+01:00 Till Rohrmann <[hidden email]>:
Could it be that your data is skewed? This could lead to different loads on different task managers.

With the latest Flink version, the web interface should show you how many bytes each operator has written and received. There you could see if one operator receives more elements than the others.

Cheers,
Till

On Wed, Jan 27, 2016 at 1:35 PM, Pieter Hameete <[hidden email]> wrote:
Hi guys,

Currently I am running a job in the GCloud in a configuration with 4 task managers that each have 4 CPUs (for a total parallelism of 16). 

However, I noticed my job is running much slower than expected and after some more investigation I found that one of the workers is doing a majority of the work (its CPU load was at 100% while the others were almost idle). 

My job execution plan can be found here: http://i.imgur.com/fHKhVFf.png

The input is split into multiple files so loading the data is properly distributed over the workers.

I am wondering if you can provide me with some tips on how to figure out what is going wrong here:
  • Could this imbalance in workload be the result of an imbalance in the hash paritioning?
    • Is there a convenient way to see how many elements each worker gets to process? Would it work to write the output of the CoGroup to disk because each worker writes to its own output file and investigate the differences?
  • Is there something strange about the execution plan that could cause this?
Thanks and kind regards,

Pieter






Reply | Threaded
Open this post in threaded view
|

Re: Imbalanced workload between workers

Stephan Ewen
Hey!

Which CoGroup in the plan are the statistics for? For the first or the second one?

Stephan


On Thu, Jan 28, 2016 at 10:58 AM, Till Rohrmann <[hidden email]> wrote:
Hi Pieter,

you can see in the log that the operators are all started at the same time. However, you're right that they don't finish at the same time. The sub tasks which run on the same node exhibit a similar runtime. However, all nodes (not only hadoop-w-0 compared to the others) show different runtimes. I would guess that this is due to some other load on the GCloud machines or some other kind of asymmetry between the hosts.

Cheers,
Till

On Thu, Jan 28, 2016 at 10:17 AM, Pieter Hameete <[hidden email]> wrote:
Hi Stephen, Till,

I've watched the Job again and please see the log of the CoGroup operator:

Inline afbeelding 1

All workers get to process a fairly distributed amount of bytes and records, BUT hadoop-w-0, hadoop-w-2 and hadoop-w-3 don't start working until hadoop-w-1 is finished. Is this behavior to be expected with a CoGroup or could there still be something wrong in the distrubtion of the data?

Kind regards,

Pieter

2016-01-27 21:48 GMT+01:00 Stephan Ewen <[hidden email]>:
Hi Pieter!

Interesting, but good :-)

I don't think we did much on the hash functions since 0.9.1. I am a bit surprised that it made such a difference. Well, as long as it improves with the newer version :-)

Greetings,
Stephan


On Wed, Jan 27, 2016 at 9:42 PM, Pieter Hameete <[hidden email]> wrote:
Hi Till,

i've upgraded to Flink 0.10.1 and ran the job again without any changes to the code to see the bytes input and output of the operators and for the different workers.To my surprise it is very well balanced between all workers and because of this the job completed much faster.

Are there any changes/fixes between Flink 0.9.1 and 0.10.1 that could cause this to be better for me now?

Thanks,

Pieter

2016-01-27 14:10 GMT+01:00 Pieter Hameete <[hidden email]>:

Cheers for the quick reply Till.

That would be very useful information to have! I'll upgrade my project to Flink 0.10.1 tongiht and let you know if I can find out if theres a skew in the data :-)

- Pieter


2016-01-27 13:49 GMT+01:00 Till Rohrmann <[hidden email]>:
Could it be that your data is skewed? This could lead to different loads on different task managers.

With the latest Flink version, the web interface should show you how many bytes each operator has written and received. There you could see if one operator receives more elements than the others.

Cheers,
Till

On Wed, Jan 27, 2016 at 1:35 PM, Pieter Hameete <[hidden email]> wrote:
Hi guys,

Currently I am running a job in the GCloud in a configuration with 4 task managers that each have 4 CPUs (for a total parallelism of 16). 

However, I noticed my job is running much slower than expected and after some more investigation I found that one of the workers is doing a majority of the work (its CPU load was at 100% while the others were almost idle). 

My job execution plan can be found here: http://i.imgur.com/fHKhVFf.png

The input is split into multiple files so loading the data is properly distributed over the workers.

I am wondering if you can provide me with some tips on how to figure out what is going wrong here:
  • Could this imbalance in workload be the result of an imbalance in the hash paritioning?
    • Is there a convenient way to see how many elements each worker gets to process? Would it work to write the output of the CoGroup to disk because each worker writes to its own output file and investigate the differences?
  • Is there something strange about the execution plan that could cause this?
Thanks and kind regards,

Pieter







Reply | Threaded
Open this post in threaded view
|

Re: Imbalanced workload between workers

Pieter Hameete
In reply to this post by Till Rohrmann
Hi Till, Stephan,

Indeed I think you're right it is just a coincidence that the three jobs started producing output as soon as the first one finished. Getting back to my comment that after the upgrade to Flink 0.10.1 the job was much faster I searched a bit further. I've now noticed that after setting up a fresh GCloud cluster the job runs roughly 15x faster than the case I just showed you. Im executing many, many jobs in sequence and it seems as if performance is degrading a lot over time. Let me tell you what I'm doing exactly:

I'm running a set of 16 queries in sequence on the cluster. Each query is executed multiple (5) times in a row so I can average measurements for a more reliable estimate of performance in the GCloud. This means that I'm running a total of 60 jobs in sequence, and on 5 different datasets, totalling to 300 jobs. For each job I get the execution environment, define the query, write to an output file and run the env.execute. After each job I clear the output folder so the next job can start fresh.

Now I'm wondering what causes this performance degradation. Could this have to do with the GCloud (I surely hope they don't scale back my performance by 90% ;-) , do I need to some more cleaning and resetting on the Flink Cluster, or is my above approach asking for trouble?

Thanks again for your help!

- Pieter





2016-01-28 10:58 GMT+01:00 Till Rohrmann <[hidden email]>:
Hi Pieter,

you can see in the log that the operators are all started at the same time. However, you're right that they don't finish at the same time. The sub tasks which run on the same node exhibit a similar runtime. However, all nodes (not only hadoop-w-0 compared to the others) show different runtimes. I would guess that this is due to some other load on the GCloud machines or some other kind of asymmetry between the hosts.

Cheers,
Till

On Thu, Jan 28, 2016 at 10:17 AM, Pieter Hameete <[hidden email]> wrote:
Hi Stephen, Till,

I've watched the Job again and please see the log of the CoGroup operator:

Inline afbeelding 1

All workers get to process a fairly distributed amount of bytes and records, BUT hadoop-w-0, hadoop-w-2 and hadoop-w-3 don't start working until hadoop-w-1 is finished. Is this behavior to be expected with a CoGroup or could there still be something wrong in the distrubtion of the data?

Kind regards,

Pieter

2016-01-27 21:48 GMT+01:00 Stephan Ewen <[hidden email]>:
Hi Pieter!

Interesting, but good :-)

I don't think we did much on the hash functions since 0.9.1. I am a bit surprised that it made such a difference. Well, as long as it improves with the newer version :-)

Greetings,
Stephan


On Wed, Jan 27, 2016 at 9:42 PM, Pieter Hameete <[hidden email]> wrote:
Hi Till,

i've upgraded to Flink 0.10.1 and ran the job again without any changes to the code to see the bytes input and output of the operators and for the different workers.To my surprise it is very well balanced between all workers and because of this the job completed much faster.

Are there any changes/fixes between Flink 0.9.1 and 0.10.1 that could cause this to be better for me now?

Thanks,

Pieter

2016-01-27 14:10 GMT+01:00 Pieter Hameete <[hidden email]>:

Cheers for the quick reply Till.

That would be very useful information to have! I'll upgrade my project to Flink 0.10.1 tongiht and let you know if I can find out if theres a skew in the data :-)

- Pieter


2016-01-27 13:49 GMT+01:00 Till Rohrmann <[hidden email]>:
Could it be that your data is skewed? This could lead to different loads on different task managers.

With the latest Flink version, the web interface should show you how many bytes each operator has written and received. There you could see if one operator receives more elements than the others.

Cheers,
Till

On Wed, Jan 27, 2016 at 1:35 PM, Pieter Hameete <[hidden email]> wrote:
Hi guys,

Currently I am running a job in the GCloud in a configuration with 4 task managers that each have 4 CPUs (for a total parallelism of 16). 

However, I noticed my job is running much slower than expected and after some more investigation I found that one of the workers is doing a majority of the work (its CPU load was at 100% while the others were almost idle). 

My job execution plan can be found here: http://i.imgur.com/fHKhVFf.png

The input is split into multiple files so loading the data is properly distributed over the workers.

I am wondering if you can provide me with some tips on how to figure out what is going wrong here:
  • Could this imbalance in workload be the result of an imbalance in the hash paritioning?
    • Is there a convenient way to see how many elements each worker gets to process? Would it work to write the output of the CoGroup to disk because each worker writes to its own output file and investigate the differences?
  • Is there something strange about the execution plan that could cause this?
Thanks and kind regards,

Pieter







Reply | Threaded
Open this post in threaded view
|

Re: Imbalanced workload between workers

Pieter Hameete
In reply to this post by Stephan Ewen
Hi Stephen,

it was the first CoGroup :-)

- Pieter

2016-01-28 11:24 GMT+01:00 Stephan Ewen <[hidden email]>:
Hey!

Which CoGroup in the plan are the statistics for? For the first or the second one?

Stephan


On Thu, Jan 28, 2016 at 10:58 AM, Till Rohrmann <[hidden email]> wrote:
Hi Pieter,

you can see in the log that the operators are all started at the same time. However, you're right that they don't finish at the same time. The sub tasks which run on the same node exhibit a similar runtime. However, all nodes (not only hadoop-w-0 compared to the others) show different runtimes. I would guess that this is due to some other load on the GCloud machines or some other kind of asymmetry between the hosts.

Cheers,
Till

On Thu, Jan 28, 2016 at 10:17 AM, Pieter Hameete <[hidden email]> wrote:
Hi Stephen, Till,

I've watched the Job again and please see the log of the CoGroup operator:

Inline afbeelding 1

All workers get to process a fairly distributed amount of bytes and records, BUT hadoop-w-0, hadoop-w-2 and hadoop-w-3 don't start working until hadoop-w-1 is finished. Is this behavior to be expected with a CoGroup or could there still be something wrong in the distrubtion of the data?

Kind regards,

Pieter

2016-01-27 21:48 GMT+01:00 Stephan Ewen <[hidden email]>:
Hi Pieter!

Interesting, but good :-)

I don't think we did much on the hash functions since 0.9.1. I am a bit surprised that it made such a difference. Well, as long as it improves with the newer version :-)

Greetings,
Stephan


On Wed, Jan 27, 2016 at 9:42 PM, Pieter Hameete <[hidden email]> wrote:
Hi Till,

i've upgraded to Flink 0.10.1 and ran the job again without any changes to the code to see the bytes input and output of the operators and for the different workers.To my surprise it is very well balanced between all workers and because of this the job completed much faster.

Are there any changes/fixes between Flink 0.9.1 and 0.10.1 that could cause this to be better for me now?

Thanks,

Pieter

2016-01-27 14:10 GMT+01:00 Pieter Hameete <[hidden email]>:

Cheers for the quick reply Till.

That would be very useful information to have! I'll upgrade my project to Flink 0.10.1 tongiht and let you know if I can find out if theres a skew in the data :-)

- Pieter


2016-01-27 13:49 GMT+01:00 Till Rohrmann <[hidden email]>:
Could it be that your data is skewed? This could lead to different loads on different task managers.

With the latest Flink version, the web interface should show you how many bytes each operator has written and received. There you could see if one operator receives more elements than the others.

Cheers,
Till

On Wed, Jan 27, 2016 at 1:35 PM, Pieter Hameete <[hidden email]> wrote:
Hi guys,

Currently I am running a job in the GCloud in a configuration with 4 task managers that each have 4 CPUs (for a total parallelism of 16). 

However, I noticed my job is running much slower than expected and after some more investigation I found that one of the workers is doing a majority of the work (its CPU load was at 100% while the others were almost idle). 

My job execution plan can be found here: http://i.imgur.com/fHKhVFf.png

The input is split into multiple files so loading the data is properly distributed over the workers.

I am wondering if you can provide me with some tips on how to figure out what is going wrong here:
  • Could this imbalance in workload be the result of an imbalance in the hash paritioning?
    • Is there a convenient way to see how many elements each worker gets to process? Would it work to write the output of the CoGroup to disk because each worker writes to its own output file and investigate the differences?
  • Is there something strange about the execution plan that could cause this?
Thanks and kind regards,

Pieter








Reply | Threaded
Open this post in threaded view
|

Re: Imbalanced workload between workers

Pieter Hameete
In reply to this post by Pieter Hameete
Hi Till and Stephan, 

I've got some additional information. I just ran a query 60 times in a row, and all jobs ran in roughly the same time. I did this for both a simple job that requires intensive parsing and a more complex query that has several cogroups. Neither of the cases showed any slowing down. I believe this is an indication that it is not a problem with unreliable I/O or computational resources.

However, when I now run the complete set of 15 different queries again, with 3 repetitions each, things start slowing down very quickly.


I run this as follows in the GCloud: /flink run dawn-flink.jar gs://dawn-flink/data/split4G gs://dawn-flink/output/ 3 result.csv

Is there something I am doing incorrectly by running these different queries in sequence like this? I have attempted parametrizing further so I would call ./flink run for each separate query but this did not make a difference.

Cheers!

- Pieter



2016-01-28 11:28 GMT+01:00 Pieter Hameete <[hidden email]>:
Hi Till, Stephan,

Indeed I think you're right it is just a coincidence that the three jobs started producing output as soon as the first one finished. Getting back to my comment that after the upgrade to Flink 0.10.1 the job was much faster I searched a bit further. I've now noticed that after setting up a fresh GCloud cluster the job runs roughly 15x faster than the case I just showed you. Im executing many, many jobs in sequence and it seems as if performance is degrading a lot over time. Let me tell you what I'm doing exactly:

I'm running a set of 16 queries in sequence on the cluster. Each query is executed multiple (5) times in a row so I can average measurements for a more reliable estimate of performance in the GCloud. This means that I'm running a total of 60 jobs in sequence, and on 5 different datasets, totalling to 300 jobs. For each job I get the execution environment, define the query, write to an output file and run the env.execute. After each job I clear the output folder so the next job can start fresh.

Now I'm wondering what causes this performance degradation. Could this have to do with the GCloud (I surely hope they don't scale back my performance by 90% ;-) , do I need to some more cleaning and resetting on the Flink Cluster, or is my above approach asking for trouble?

Thanks again for your help!

- Pieter





2016-01-28 10:58 GMT+01:00 Till Rohrmann <[hidden email]>:
Hi Pieter,

you can see in the log that the operators are all started at the same time. However, you're right that they don't finish at the same time. The sub tasks which run on the same node exhibit a similar runtime. However, all nodes (not only hadoop-w-0 compared to the others) show different runtimes. I would guess that this is due to some other load on the GCloud machines or some other kind of asymmetry between the hosts.

Cheers,
Till

On Thu, Jan 28, 2016 at 10:17 AM, Pieter Hameete <[hidden email]> wrote:
Hi Stephen, Till,

I've watched the Job again and please see the log of the CoGroup operator:

Inline afbeelding 1

All workers get to process a fairly distributed amount of bytes and records, BUT hadoop-w-0, hadoop-w-2 and hadoop-w-3 don't start working until hadoop-w-1 is finished. Is this behavior to be expected with a CoGroup or could there still be something wrong in the distrubtion of the data?

Kind regards,

Pieter

2016-01-27 21:48 GMT+01:00 Stephan Ewen <[hidden email]>:
Hi Pieter!

Interesting, but good :-)

I don't think we did much on the hash functions since 0.9.1. I am a bit surprised that it made such a difference. Well, as long as it improves with the newer version :-)

Greetings,
Stephan


On Wed, Jan 27, 2016 at 9:42 PM, Pieter Hameete <[hidden email]> wrote:
Hi Till,

i've upgraded to Flink 0.10.1 and ran the job again without any changes to the code to see the bytes input and output of the operators and for the different workers.To my surprise it is very well balanced between all workers and because of this the job completed much faster.

Are there any changes/fixes between Flink 0.9.1 and 0.10.1 that could cause this to be better for me now?

Thanks,

Pieter

2016-01-27 14:10 GMT+01:00 Pieter Hameete <[hidden email]>:

Cheers for the quick reply Till.

That would be very useful information to have! I'll upgrade my project to Flink 0.10.1 tongiht and let you know if I can find out if theres a skew in the data :-)

- Pieter


2016-01-27 13:49 GMT+01:00 Till Rohrmann <[hidden email]>:
Could it be that your data is skewed? This could lead to different loads on different task managers.

With the latest Flink version, the web interface should show you how many bytes each operator has written and received. There you could see if one operator receives more elements than the others.

Cheers,
Till

On Wed, Jan 27, 2016 at 1:35 PM, Pieter Hameete <[hidden email]> wrote:
Hi guys,

Currently I am running a job in the GCloud in a configuration with 4 task managers that each have 4 CPUs (for a total parallelism of 16). 

However, I noticed my job is running much slower than expected and after some more investigation I found that one of the workers is doing a majority of the work (its CPU load was at 100% while the others were almost idle). 

My job execution plan can be found here: http://i.imgur.com/fHKhVFf.png

The input is split into multiple files so loading the data is properly distributed over the workers.

I am wondering if you can provide me with some tips on how to figure out what is going wrong here:
  • Could this imbalance in workload be the result of an imbalance in the hash paritioning?
    • Is there a convenient way to see how many elements each worker gets to process? Would it work to write the output of the CoGroup to disk because each worker writes to its own output file and investigate the differences?
  • Is there something strange about the execution plan that could cause this?
Thanks and kind regards,

Pieter








Reply | Threaded
Open this post in threaded view
|

Re: Imbalanced workload between workers

Stephan Ewen
Hi!

Thank you for looking into this.

Such gradual slowdown is often caused by memory leaks, which reduces available memory and makes GC more expensive.

Can you create a heap dump after the series of runs that cause the slowdown? Then we can have a look whether there are leaked Flink objects, or if the user code is causing that.

Greetings,
Stephan


On Thu, Jan 28, 2016 at 5:58 PM, Pieter Hameete <[hidden email]> wrote:
Hi Till and Stephan, 

I've got some additional information. I just ran a query 60 times in a row, and all jobs ran in roughly the same time. I did this for both a simple job that requires intensive parsing and a more complex query that has several cogroups. Neither of the cases showed any slowing down. I believe this is an indication that it is not a problem with unreliable I/O or computational resources.

However, when I now run the complete set of 15 different queries again, with 3 repetitions each, things start slowing down very quickly.


I run this as follows in the GCloud: /flink run dawn-flink.jar gs://dawn-flink/data/split4G gs://dawn-flink/output/ 3 result.csv

Is there something I am doing incorrectly by running these different queries in sequence like this? I have attempted parametrizing further so I would call ./flink run for each separate query but this did not make a difference.

Cheers!

- Pieter



2016-01-28 11:28 GMT+01:00 Pieter Hameete <[hidden email]>:
Hi Till, Stephan,

Indeed I think you're right it is just a coincidence that the three jobs started producing output as soon as the first one finished. Getting back to my comment that after the upgrade to Flink 0.10.1 the job was much faster I searched a bit further. I've now noticed that after setting up a fresh GCloud cluster the job runs roughly 15x faster than the case I just showed you. Im executing many, many jobs in sequence and it seems as if performance is degrading a lot over time. Let me tell you what I'm doing exactly:

I'm running a set of 16 queries in sequence on the cluster. Each query is executed multiple (5) times in a row so I can average measurements for a more reliable estimate of performance in the GCloud. This means that I'm running a total of 60 jobs in sequence, and on 5 different datasets, totalling to 300 jobs. For each job I get the execution environment, define the query, write to an output file and run the env.execute. After each job I clear the output folder so the next job can start fresh.

Now I'm wondering what causes this performance degradation. Could this have to do with the GCloud (I surely hope they don't scale back my performance by 90% ;-) , do I need to some more cleaning and resetting on the Flink Cluster, or is my above approach asking for trouble?

Thanks again for your help!

- Pieter





2016-01-28 10:58 GMT+01:00 Till Rohrmann <[hidden email]>:
Hi Pieter,

you can see in the log that the operators are all started at the same time. However, you're right that they don't finish at the same time. The sub tasks which run on the same node exhibit a similar runtime. However, all nodes (not only hadoop-w-0 compared to the others) show different runtimes. I would guess that this is due to some other load on the GCloud machines or some other kind of asymmetry between the hosts.

Cheers,
Till

On Thu, Jan 28, 2016 at 10:17 AM, Pieter Hameete <[hidden email]> wrote:
Hi Stephen, Till,

I've watched the Job again and please see the log of the CoGroup operator:

Inline afbeelding 1

All workers get to process a fairly distributed amount of bytes and records, BUT hadoop-w-0, hadoop-w-2 and hadoop-w-3 don't start working until hadoop-w-1 is finished. Is this behavior to be expected with a CoGroup or could there still be something wrong in the distrubtion of the data?

Kind regards,

Pieter

2016-01-27 21:48 GMT+01:00 Stephan Ewen <[hidden email]>:
Hi Pieter!

Interesting, but good :-)

I don't think we did much on the hash functions since 0.9.1. I am a bit surprised that it made such a difference. Well, as long as it improves with the newer version :-)

Greetings,
Stephan


On Wed, Jan 27, 2016 at 9:42 PM, Pieter Hameete <[hidden email]> wrote:
Hi Till,

i've upgraded to Flink 0.10.1 and ran the job again without any changes to the code to see the bytes input and output of the operators and for the different workers.To my surprise it is very well balanced between all workers and because of this the job completed much faster.

Are there any changes/fixes between Flink 0.9.1 and 0.10.1 that could cause this to be better for me now?

Thanks,

Pieter

2016-01-27 14:10 GMT+01:00 Pieter Hameete <[hidden email]>:

Cheers for the quick reply Till.

That would be very useful information to have! I'll upgrade my project to Flink 0.10.1 tongiht and let you know if I can find out if theres a skew in the data :-)

- Pieter


2016-01-27 13:49 GMT+01:00 Till Rohrmann <[hidden email]>:
Could it be that your data is skewed? This could lead to different loads on different task managers.

With the latest Flink version, the web interface should show you how many bytes each operator has written and received. There you could see if one operator receives more elements than the others.

Cheers,
Till

On Wed, Jan 27, 2016 at 1:35 PM, Pieter Hameete <[hidden email]> wrote:
Hi guys,

Currently I am running a job in the GCloud in a configuration with 4 task managers that each have 4 CPUs (for a total parallelism of 16). 

However, I noticed my job is running much slower than expected and after some more investigation I found that one of the workers is doing a majority of the work (its CPU load was at 100% while the others were almost idle). 

My job execution plan can be found here: http://i.imgur.com/fHKhVFf.png

The input is split into multiple files so loading the data is properly distributed over the workers.

I am wondering if you can provide me with some tips on how to figure out what is going wrong here:
  • Could this imbalance in workload be the result of an imbalance in the hash paritioning?
    • Is there a convenient way to see how many elements each worker gets to process? Would it work to write the output of the CoGroup to disk because each worker writes to its own output file and investigate the differences?
  • Is there something strange about the execution plan that could cause this?
Thanks and kind regards,

Pieter









Reply | Threaded
Open this post in threaded view
|

Re: Imbalanced workload between workers

Pieter Hameete
Hi Stephan,

cheers for your response. I have the heap dumps, you can download them here (note they are fairly large, tarballs of up to 2GB each, untarred up to 12GB)  :

Job Manager:


Task Manager 0:


Task Manager 1:


I've also attempted to reproduce the issue in a local cluster with a single 4CPU task manager, however no slowdown occurs so far.

Please let me know if there is anything else I can provide you with.

- Pieter

2016-01-28 22:18 GMT+01:00 Stephan Ewen <[hidden email]>:
Hi!

Thank you for looking into this.

Such gradual slowdown is often caused by memory leaks, which reduces available memory and makes GC more expensive.

Can you create a heap dump after the series of runs that cause the slowdown? Then we can have a look whether there are leaked Flink objects, or if the user code is causing that.

Greetings,
Stephan


On Thu, Jan 28, 2016 at 5:58 PM, Pieter Hameete <[hidden email]> wrote:
Hi Till and Stephan, 

I've got some additional information. I just ran a query 60 times in a row, and all jobs ran in roughly the same time. I did this for both a simple job that requires intensive parsing and a more complex query that has several cogroups. Neither of the cases showed any slowing down. I believe this is an indication that it is not a problem with unreliable I/O or computational resources.

However, when I now run the complete set of 15 different queries again, with 3 repetitions each, things start slowing down very quickly.


I run this as follows in the GCloud: /flink run dawn-flink.jar gs://dawn-flink/data/split4G gs://dawn-flink/output/ 3 result.csv

Is there something I am doing incorrectly by running these different queries in sequence like this? I have attempted parametrizing further so I would call ./flink run for each separate query but this did not make a difference.

Cheers!

- Pieter



2016-01-28 11:28 GMT+01:00 Pieter Hameete <[hidden email]>:
Hi Till, Stephan,

Indeed I think you're right it is just a coincidence that the three jobs started producing output as soon as the first one finished. Getting back to my comment that after the upgrade to Flink 0.10.1 the job was much faster I searched a bit further. I've now noticed that after setting up a fresh GCloud cluster the job runs roughly 15x faster than the case I just showed you. Im executing many, many jobs in sequence and it seems as if performance is degrading a lot over time. Let me tell you what I'm doing exactly:

I'm running a set of 16 queries in sequence on the cluster. Each query is executed multiple (5) times in a row so I can average measurements for a more reliable estimate of performance in the GCloud. This means that I'm running a total of 60 jobs in sequence, and on 5 different datasets, totalling to 300 jobs. For each job I get the execution environment, define the query, write to an output file and run the env.execute. After each job I clear the output folder so the next job can start fresh.

Now I'm wondering what causes this performance degradation. Could this have to do with the GCloud (I surely hope they don't scale back my performance by 90% ;-) , do I need to some more cleaning and resetting on the Flink Cluster, or is my above approach asking for trouble?

Thanks again for your help!

- Pieter





2016-01-28 10:58 GMT+01:00 Till Rohrmann <[hidden email]>:
Hi Pieter,

you can see in the log that the operators are all started at the same time. However, you're right that they don't finish at the same time. The sub tasks which run on the same node exhibit a similar runtime. However, all nodes (not only hadoop-w-0 compared to the others) show different runtimes. I would guess that this is due to some other load on the GCloud machines or some other kind of asymmetry between the hosts.

Cheers,
Till

On Thu, Jan 28, 2016 at 10:17 AM, Pieter Hameete <[hidden email]> wrote:
Hi Stephen, Till,

I've watched the Job again and please see the log of the CoGroup operator:

Inline afbeelding 1

All workers get to process a fairly distributed amount of bytes and records, BUT hadoop-w-0, hadoop-w-2 and hadoop-w-3 don't start working until hadoop-w-1 is finished. Is this behavior to be expected with a CoGroup or could there still be something wrong in the distrubtion of the data?

Kind regards,

Pieter

2016-01-27 21:48 GMT+01:00 Stephan Ewen <[hidden email]>:
Hi Pieter!

Interesting, but good :-)

I don't think we did much on the hash functions since 0.9.1. I am a bit surprised that it made such a difference. Well, as long as it improves with the newer version :-)

Greetings,
Stephan


On Wed, Jan 27, 2016 at 9:42 PM, Pieter Hameete <[hidden email]> wrote:
Hi Till,

i've upgraded to Flink 0.10.1 and ran the job again without any changes to the code to see the bytes input and output of the operators and for the different workers.To my surprise it is very well balanced between all workers and because of this the job completed much faster.

Are there any changes/fixes between Flink 0.9.1 and 0.10.1 that could cause this to be better for me now?

Thanks,

Pieter

2016-01-27 14:10 GMT+01:00 Pieter Hameete <[hidden email]>:

Cheers for the quick reply Till.

That would be very useful information to have! I'll upgrade my project to Flink 0.10.1 tongiht and let you know if I can find out if theres a skew in the data :-)

- Pieter


2016-01-27 13:49 GMT+01:00 Till Rohrmann <[hidden email]>:
Could it be that your data is skewed? This could lead to different loads on different task managers.

With the latest Flink version, the web interface should show you how many bytes each operator has written and received. There you could see if one operator receives more elements than the others.

Cheers,
Till

On Wed, Jan 27, 2016 at 1:35 PM, Pieter Hameete <[hidden email]> wrote:
Hi guys,

Currently I am running a job in the GCloud in a configuration with 4 task managers that each have 4 CPUs (for a total parallelism of 16). 

However, I noticed my job is running much slower than expected and after some more investigation I found that one of the workers is doing a majority of the work (its CPU load was at 100% while the others were almost idle). 

My job execution plan can be found here: http://i.imgur.com/fHKhVFf.png

The input is split into multiple files so loading the data is properly distributed over the workers.

I am wondering if you can provide me with some tips on how to figure out what is going wrong here:
  • Could this imbalance in workload be the result of an imbalance in the hash paritioning?
    • Is there a convenient way to see how many elements each worker gets to process? Would it work to write the output of the CoGroup to disk because each worker writes to its own output file and investigate the differences?
  • Is there something strange about the execution plan that could cause this?
Thanks and kind regards,

Pieter










Reply | Threaded
Open this post in threaded view
|

Re: Imbalanced workload between workers

Stephan Ewen
Super, thanks a lot.

I think we need to find some time and look through this, but this should help a lot!

Greetings,
Stephan


On Fri, Jan 29, 2016 at 1:32 PM, Pieter Hameete <[hidden email]> wrote:
Hi Stephan,

cheers for your response. I have the heap dumps, you can download them here (note they are fairly large, tarballs of up to 2GB each, untarred up to 12GB)  :

Job Manager:


Task Manager 0:


Task Manager 1:


I've also attempted to reproduce the issue in a local cluster with a single 4CPU task manager, however no slowdown occurs so far.

Please let me know if there is anything else I can provide you with.

- Pieter

2016-01-28 22:18 GMT+01:00 Stephan Ewen <[hidden email]>:
Hi!

Thank you for looking into this.

Such gradual slowdown is often caused by memory leaks, which reduces available memory and makes GC more expensive.

Can you create a heap dump after the series of runs that cause the slowdown? Then we can have a look whether there are leaked Flink objects, or if the user code is causing that.

Greetings,
Stephan


On Thu, Jan 28, 2016 at 5:58 PM, Pieter Hameete <[hidden email]> wrote:
Hi Till and Stephan, 

I've got some additional information. I just ran a query 60 times in a row, and all jobs ran in roughly the same time. I did this for both a simple job that requires intensive parsing and a more complex query that has several cogroups. Neither of the cases showed any slowing down. I believe this is an indication that it is not a problem with unreliable I/O or computational resources.

However, when I now run the complete set of 15 different queries again, with 3 repetitions each, things start slowing down very quickly.


I run this as follows in the GCloud: /flink run dawn-flink.jar gs://dawn-flink/data/split4G gs://dawn-flink/output/ 3 result.csv

Is there something I am doing incorrectly by running these different queries in sequence like this? I have attempted parametrizing further so I would call ./flink run for each separate query but this did not make a difference.

Cheers!

- Pieter



2016-01-28 11:28 GMT+01:00 Pieter Hameete <[hidden email]>:
Hi Till, Stephan,

Indeed I think you're right it is just a coincidence that the three jobs started producing output as soon as the first one finished. Getting back to my comment that after the upgrade to Flink 0.10.1 the job was much faster I searched a bit further. I've now noticed that after setting up a fresh GCloud cluster the job runs roughly 15x faster than the case I just showed you. Im executing many, many jobs in sequence and it seems as if performance is degrading a lot over time. Let me tell you what I'm doing exactly:

I'm running a set of 16 queries in sequence on the cluster. Each query is executed multiple (5) times in a row so I can average measurements for a more reliable estimate of performance in the GCloud. This means that I'm running a total of 60 jobs in sequence, and on 5 different datasets, totalling to 300 jobs. For each job I get the execution environment, define the query, write to an output file and run the env.execute. After each job I clear the output folder so the next job can start fresh.

Now I'm wondering what causes this performance degradation. Could this have to do with the GCloud (I surely hope they don't scale back my performance by 90% ;-) , do I need to some more cleaning and resetting on the Flink Cluster, or is my above approach asking for trouble?

Thanks again for your help!

- Pieter





2016-01-28 10:58 GMT+01:00 Till Rohrmann <[hidden email]>:
Hi Pieter,

you can see in the log that the operators are all started at the same time. However, you're right that they don't finish at the same time. The sub tasks which run on the same node exhibit a similar runtime. However, all nodes (not only hadoop-w-0 compared to the others) show different runtimes. I would guess that this is due to some other load on the GCloud machines or some other kind of asymmetry between the hosts.

Cheers,
Till

On Thu, Jan 28, 2016 at 10:17 AM, Pieter Hameete <[hidden email]> wrote:
Hi Stephen, Till,

I've watched the Job again and please see the log of the CoGroup operator:

Inline afbeelding 1

All workers get to process a fairly distributed amount of bytes and records, BUT hadoop-w-0, hadoop-w-2 and hadoop-w-3 don't start working until hadoop-w-1 is finished. Is this behavior to be expected with a CoGroup or could there still be something wrong in the distrubtion of the data?

Kind regards,

Pieter

2016-01-27 21:48 GMT+01:00 Stephan Ewen <[hidden email]>:
Hi Pieter!

Interesting, but good :-)

I don't think we did much on the hash functions since 0.9.1. I am a bit surprised that it made such a difference. Well, as long as it improves with the newer version :-)

Greetings,
Stephan


On Wed, Jan 27, 2016 at 9:42 PM, Pieter Hameete <[hidden email]> wrote:
Hi Till,

i've upgraded to Flink 0.10.1 and ran the job again without any changes to the code to see the bytes input and output of the operators and for the different workers.To my surprise it is very well balanced between all workers and because of this the job completed much faster.

Are there any changes/fixes between Flink 0.9.1 and 0.10.1 that could cause this to be better for me now?

Thanks,

Pieter

2016-01-27 14:10 GMT+01:00 Pieter Hameete <[hidden email]>:

Cheers for the quick reply Till.

That would be very useful information to have! I'll upgrade my project to Flink 0.10.1 tongiht and let you know if I can find out if theres a skew in the data :-)

- Pieter


2016-01-27 13:49 GMT+01:00 Till Rohrmann <[hidden email]>:
Could it be that your data is skewed? This could lead to different loads on different task managers.

With the latest Flink version, the web interface should show you how many bytes each operator has written and received. There you could see if one operator receives more elements than the others.

Cheers,
Till

On Wed, Jan 27, 2016 at 1:35 PM, Pieter Hameete <[hidden email]> wrote:
Hi guys,

Currently I am running a job in the GCloud in a configuration with 4 task managers that each have 4 CPUs (for a total parallelism of 16). 

However, I noticed my job is running much slower than expected and after some more investigation I found that one of the workers is doing a majority of the work (its CPU load was at 100% while the others were almost idle). 

My job execution plan can be found here: http://i.imgur.com/fHKhVFf.png

The input is split into multiple files so loading the data is properly distributed over the workers.

I am wondering if you can provide me with some tips on how to figure out what is going wrong here:
  • Could this imbalance in workload be the result of an imbalance in the hash paritioning?
    • Is there a convenient way to see how many elements each worker gets to process? Would it work to write the output of the CoGroup to disk because each worker writes to its own output file and investigate the differences?
  • Is there something strange about the execution plan that could cause this?
Thanks and kind regards,

Pieter











Reply | Threaded
Open this post in threaded view
|

Re: Imbalanced workload between workers

Pieter Hameete
Hi Stephan,

did you manage to take a look at the dumps? I have looked myself but I could not find anything that would point towards a memory leak in my user code. I have a parser that creates quite a lot of objects, but these are not abundantly present in the heap dump. I did see that the majority of the heap was consisting of byte objects (but this is normal?) as well as Akka objects in another dump.

I am not familiar enough with Flink to know what is normal to see in the heap dumps. I am looking forward to hearing what you think :-)

- Pieter

2016-01-29 13:36 GMT+01:00 Stephan Ewen <[hidden email]>:
Super, thanks a lot.

I think we need to find some time and look through this, but this should help a lot!

Greetings,
Stephan


On Fri, Jan 29, 2016 at 1:32 PM, Pieter Hameete <[hidden email]> wrote:
Hi Stephan,

cheers for your response. I have the heap dumps, you can download them here (note they are fairly large, tarballs of up to 2GB each, untarred up to 12GB)  :

Job Manager:


Task Manager 0:


Task Manager 1:


I've also attempted to reproduce the issue in a local cluster with a single 4CPU task manager, however no slowdown occurs so far.

Please let me know if there is anything else I can provide you with.

- Pieter

2016-01-28 22:18 GMT+01:00 Stephan Ewen <[hidden email]>:
Hi!

Thank you for looking into this.

Such gradual slowdown is often caused by memory leaks, which reduces available memory and makes GC more expensive.

Can you create a heap dump after the series of runs that cause the slowdown? Then we can have a look whether there are leaked Flink objects, or if the user code is causing that.

Greetings,
Stephan


On Thu, Jan 28, 2016 at 5:58 PM, Pieter Hameete <[hidden email]> wrote:
Hi Till and Stephan, 

I've got some additional information. I just ran a query 60 times in a row, and all jobs ran in roughly the same time. I did this for both a simple job that requires intensive parsing and a more complex query that has several cogroups. Neither of the cases showed any slowing down. I believe this is an indication that it is not a problem with unreliable I/O or computational resources.

However, when I now run the complete set of 15 different queries again, with 3 repetitions each, things start slowing down very quickly.


I run this as follows in the GCloud: /flink run dawn-flink.jar gs://dawn-flink/data/split4G gs://dawn-flink/output/ 3 result.csv

Is there something I am doing incorrectly by running these different queries in sequence like this? I have attempted parametrizing further so I would call ./flink run for each separate query but this did not make a difference.

Cheers!

- Pieter



2016-01-28 11:28 GMT+01:00 Pieter Hameete <[hidden email]>:
Hi Till, Stephan,

Indeed I think you're right it is just a coincidence that the three jobs started producing output as soon as the first one finished. Getting back to my comment that after the upgrade to Flink 0.10.1 the job was much faster I searched a bit further. I've now noticed that after setting up a fresh GCloud cluster the job runs roughly 15x faster than the case I just showed you. Im executing many, many jobs in sequence and it seems as if performance is degrading a lot over time. Let me tell you what I'm doing exactly:

I'm running a set of 16 queries in sequence on the cluster. Each query is executed multiple (5) times in a row so I can average measurements for a more reliable estimate of performance in the GCloud. This means that I'm running a total of 60 jobs in sequence, and on 5 different datasets, totalling to 300 jobs. For each job I get the execution environment, define the query, write to an output file and run the env.execute. After each job I clear the output folder so the next job can start fresh.

Now I'm wondering what causes this performance degradation. Could this have to do with the GCloud (I surely hope they don't scale back my performance by 90% ;-) , do I need to some more cleaning and resetting on the Flink Cluster, or is my above approach asking for trouble?

Thanks again for your help!

- Pieter





2016-01-28 10:58 GMT+01:00 Till Rohrmann <[hidden email]>:
Hi Pieter,

you can see in the log that the operators are all started at the same time. However, you're right that they don't finish at the same time. The sub tasks which run on the same node exhibit a similar runtime. However, all nodes (not only hadoop-w-0 compared to the others) show different runtimes. I would guess that this is due to some other load on the GCloud machines or some other kind of asymmetry between the hosts.

Cheers,
Till

On Thu, Jan 28, 2016 at 10:17 AM, Pieter Hameete <[hidden email]> wrote:
Hi Stephen, Till,

I've watched the Job again and please see the log of the CoGroup operator:

Inline afbeelding 1

All workers get to process a fairly distributed amount of bytes and records, BUT hadoop-w-0, hadoop-w-2 and hadoop-w-3 don't start working until hadoop-w-1 is finished. Is this behavior to be expected with a CoGroup or could there still be something wrong in the distrubtion of the data?

Kind regards,

Pieter

2016-01-27 21:48 GMT+01:00 Stephan Ewen <[hidden email]>:
Hi Pieter!

Interesting, but good :-)

I don't think we did much on the hash functions since 0.9.1. I am a bit surprised that it made such a difference. Well, as long as it improves with the newer version :-)

Greetings,
Stephan


On Wed, Jan 27, 2016 at 9:42 PM, Pieter Hameete <[hidden email]> wrote:
Hi Till,

i've upgraded to Flink 0.10.1 and ran the job again without any changes to the code to see the bytes input and output of the operators and for the different workers.To my surprise it is very well balanced between all workers and because of this the job completed much faster.

Are there any changes/fixes between Flink 0.9.1 and 0.10.1 that could cause this to be better for me now?

Thanks,

Pieter

2016-01-27 14:10 GMT+01:00 Pieter Hameete <[hidden email]>:

Cheers for the quick reply Till.

That would be very useful information to have! I'll upgrade my project to Flink 0.10.1 tongiht and let you know if I can find out if theres a skew in the data :-)

- Pieter


2016-01-27 13:49 GMT+01:00 Till Rohrmann <[hidden email]>:
Could it be that your data is skewed? This could lead to different loads on different task managers.

With the latest Flink version, the web interface should show you how many bytes each operator has written and received. There you could see if one operator receives more elements than the others.

Cheers,
Till

On Wed, Jan 27, 2016 at 1:35 PM, Pieter Hameete <[hidden email]> wrote:
Hi guys,

Currently I am running a job in the GCloud in a configuration with 4 task managers that each have 4 CPUs (for a total parallelism of 16). 

However, I noticed my job is running much slower than expected and after some more investigation I found that one of the workers is doing a majority of the work (its CPU load was at 100% while the others were almost idle). 

My job execution plan can be found here: http://i.imgur.com/fHKhVFf.png

The input is split into multiple files so loading the data is properly distributed over the workers.

I am wondering if you can provide me with some tips on how to figure out what is going wrong here:
  • Could this imbalance in workload be the result of an imbalance in the hash paritioning?
    • Is there a convenient way to see how many elements each worker gets to process? Would it work to write the output of the CoGroup to disk because each worker writes to its own output file and investigate the differences?
  • Is there something strange about the execution plan that could cause this?
Thanks and kind regards,

Pieter












Reply | Threaded
Open this post in threaded view
|

Re: Imbalanced workload between workers

Stephan Ewen
You are using Flink 0.10, right?

The byte[] are simply Flink's reserved managed memory. It is expected that they occupy a large portion of the memory. (assuming you run 0.10 is BATCH mode).
These byte[] should also not get more or fewer, so they should not cause degradation.

I can also not spot anything suspicious (in the after slowdown dumps). The fact that the byte[] make up for 80% of the memory means that nothing else can really be leaking a lot.

Did you configure a specific amount of memory (taskmanager.memory.size or taskmanager.memory.fraction) ?

Greetings,
Stephan



On Wed, Feb 3, 2016 at 10:04 AM, Pieter Hameete <[hidden email]> wrote:
Hi Stephan,

did you manage to take a look at the dumps? I have looked myself but I could not find anything that would point towards a memory leak in my user code. I have a parser that creates quite a lot of objects, but these are not abundantly present in the heap dump. I did see that the majority of the heap was consisting of byte objects (but this is normal?) as well as Akka objects in another dump.

I am not familiar enough with Flink to know what is normal to see in the heap dumps. I am looking forward to hearing what you think :-)

- Pieter

2016-01-29 13:36 GMT+01:00 Stephan Ewen <[hidden email]>:
Super, thanks a lot.

I think we need to find some time and look through this, but this should help a lot!

Greetings,
Stephan


On Fri, Jan 29, 2016 at 1:32 PM, Pieter Hameete <[hidden email]> wrote:
Hi Stephan,

cheers for your response. I have the heap dumps, you can download them here (note they are fairly large, tarballs of up to 2GB each, untarred up to 12GB)  :

Job Manager:


Task Manager 0:


Task Manager 1:


I've also attempted to reproduce the issue in a local cluster with a single 4CPU task manager, however no slowdown occurs so far.

Please let me know if there is anything else I can provide you with.

- Pieter

2016-01-28 22:18 GMT+01:00 Stephan Ewen <[hidden email]>:
Hi!

Thank you for looking into this.

Such gradual slowdown is often caused by memory leaks, which reduces available memory and makes GC more expensive.

Can you create a heap dump after the series of runs that cause the slowdown? Then we can have a look whether there are leaked Flink objects, or if the user code is causing that.

Greetings,
Stephan


On Thu, Jan 28, 2016 at 5:58 PM, Pieter Hameete <[hidden email]> wrote:
Hi Till and Stephan, 

I've got some additional information. I just ran a query 60 times in a row, and all jobs ran in roughly the same time. I did this for both a simple job that requires intensive parsing and a more complex query that has several cogroups. Neither of the cases showed any slowing down. I believe this is an indication that it is not a problem with unreliable I/O or computational resources.

However, when I now run the complete set of 15 different queries again, with 3 repetitions each, things start slowing down very quickly.


I run this as follows in the GCloud: /flink run dawn-flink.jar gs://dawn-flink/data/split4G gs://dawn-flink/output/ 3 result.csv

Is there something I am doing incorrectly by running these different queries in sequence like this? I have attempted parametrizing further so I would call ./flink run for each separate query but this did not make a difference.

Cheers!

- Pieter



2016-01-28 11:28 GMT+01:00 Pieter Hameete <[hidden email]>:
Hi Till, Stephan,

Indeed I think you're right it is just a coincidence that the three jobs started producing output as soon as the first one finished. Getting back to my comment that after the upgrade to Flink 0.10.1 the job was much faster I searched a bit further. I've now noticed that after setting up a fresh GCloud cluster the job runs roughly 15x faster than the case I just showed you. Im executing many, many jobs in sequence and it seems as if performance is degrading a lot over time. Let me tell you what I'm doing exactly:

I'm running a set of 16 queries in sequence on the cluster. Each query is executed multiple (5) times in a row so I can average measurements for a more reliable estimate of performance in the GCloud. This means that I'm running a total of 60 jobs in sequence, and on 5 different datasets, totalling to 300 jobs. For each job I get the execution environment, define the query, write to an output file and run the env.execute. After each job I clear the output folder so the next job can start fresh.

Now I'm wondering what causes this performance degradation. Could this have to do with the GCloud (I surely hope they don't scale back my performance by 90% ;-) , do I need to some more cleaning and resetting on the Flink Cluster, or is my above approach asking for trouble?

Thanks again for your help!

- Pieter





2016-01-28 10:58 GMT+01:00 Till Rohrmann <[hidden email]>:
Hi Pieter,

you can see in the log that the operators are all started at the same time. However, you're right that they don't finish at the same time. The sub tasks which run on the same node exhibit a similar runtime. However, all nodes (not only hadoop-w-0 compared to the others) show different runtimes. I would guess that this is due to some other load on the GCloud machines or some other kind of asymmetry between the hosts.

Cheers,
Till

On Thu, Jan 28, 2016 at 10:17 AM, Pieter Hameete <[hidden email]> wrote:
Hi Stephen, Till,

I've watched the Job again and please see the log of the CoGroup operator:

Inline afbeelding 1

All workers get to process a fairly distributed amount of bytes and records, BUT hadoop-w-0, hadoop-w-2 and hadoop-w-3 don't start working until hadoop-w-1 is finished. Is this behavior to be expected with a CoGroup or could there still be something wrong in the distrubtion of the data?

Kind regards,

Pieter

2016-01-27 21:48 GMT+01:00 Stephan Ewen <[hidden email]>:
Hi Pieter!

Interesting, but good :-)

I don't think we did much on the hash functions since 0.9.1. I am a bit surprised that it made such a difference. Well, as long as it improves with the newer version :-)

Greetings,
Stephan


On Wed, Jan 27, 2016 at 9:42 PM, Pieter Hameete <[hidden email]> wrote:
Hi Till,

i've upgraded to Flink 0.10.1 and ran the job again without any changes to the code to see the bytes input and output of the operators and for the different workers.To my surprise it is very well balanced between all workers and because of this the job completed much faster.

Are there any changes/fixes between Flink 0.9.1 and 0.10.1 that could cause this to be better for me now?

Thanks,

Pieter

2016-01-27 14:10 GMT+01:00 Pieter Hameete <[hidden email]>:

Cheers for the quick reply Till.

That would be very useful information to have! I'll upgrade my project to Flink 0.10.1 tongiht and let you know if I can find out if theres a skew in the data :-)

- Pieter


2016-01-27 13:49 GMT+01:00 Till Rohrmann <[hidden email]>:
Could it be that your data is skewed? This could lead to different loads on different task managers.

With the latest Flink version, the web interface should show you how many bytes each operator has written and received. There you could see if one operator receives more elements than the others.

Cheers,
Till

On Wed, Jan 27, 2016 at 1:35 PM, Pieter Hameete <[hidden email]> wrote:
Hi guys,

Currently I am running a job in the GCloud in a configuration with 4 task managers that each have 4 CPUs (for a total parallelism of 16). 

However, I noticed my job is running much slower than expected and after some more investigation I found that one of the workers is doing a majority of the work (its CPU load was at 100% while the others were almost idle). 

My job execution plan can be found here: http://i.imgur.com/fHKhVFf.png

The input is split into multiple files so loading the data is properly distributed over the workers.

I am wondering if you can provide me with some tips on how to figure out what is going wrong here:
  • Could this imbalance in workload be the result of an imbalance in the hash paritioning?
    • Is there a convenient way to see how many elements each worker gets to process? Would it work to write the output of the CoGroup to disk because each worker writes to its own output file and investigate the differences?
  • Is there something strange about the execution plan that could cause this?
Thanks and kind regards,

Pieter













Reply | Threaded
Open this post in threaded view
|

Re: Imbalanced workload between workers

Stephan Ewen
EDIT: The by[] make up for 98% of the memory. That seems correct. 80% would mean something else is consuming a lot, which should not be the case.

On Fri, Feb 5, 2016 at 6:09 PM, Stephan Ewen <[hidden email]> wrote:
You are using Flink 0.10, right?

The byte[] are simply Flink's reserved managed memory. It is expected that they occupy a large portion of the memory. (assuming you run 0.10 is BATCH mode).
These byte[] should also not get more or fewer, so they should not cause degradation.

I can also not spot anything suspicious (in the after slowdown dumps). The fact that the byte[] make up for 80% of the memory means that nothing else can really be leaking a lot.

Did you configure a specific amount of memory (taskmanager.memory.size or taskmanager.memory.fraction) ?

Greetings,
Stephan



On Wed, Feb 3, 2016 at 10:04 AM, Pieter Hameete <[hidden email]> wrote:
Hi Stephan,

did you manage to take a look at the dumps? I have looked myself but I could not find anything that would point towards a memory leak in my user code. I have a parser that creates quite a lot of objects, but these are not abundantly present in the heap dump. I did see that the majority of the heap was consisting of byte objects (but this is normal?) as well as Akka objects in another dump.

I am not familiar enough with Flink to know what is normal to see in the heap dumps. I am looking forward to hearing what you think :-)

- Pieter

2016-01-29 13:36 GMT+01:00 Stephan Ewen <[hidden email]>:
Super, thanks a lot.

I think we need to find some time and look through this, but this should help a lot!

Greetings,
Stephan


On Fri, Jan 29, 2016 at 1:32 PM, Pieter Hameete <[hidden email]> wrote:
Hi Stephan,

cheers for your response. I have the heap dumps, you can download them here (note they are fairly large, tarballs of up to 2GB each, untarred up to 12GB)  :

Job Manager:


Task Manager 0:


Task Manager 1:


I've also attempted to reproduce the issue in a local cluster with a single 4CPU task manager, however no slowdown occurs so far.

Please let me know if there is anything else I can provide you with.

- Pieter

2016-01-28 22:18 GMT+01:00 Stephan Ewen <[hidden email]>:
Hi!

Thank you for looking into this.

Such gradual slowdown is often caused by memory leaks, which reduces available memory and makes GC more expensive.

Can you create a heap dump after the series of runs that cause the slowdown? Then we can have a look whether there are leaked Flink objects, or if the user code is causing that.

Greetings,
Stephan


On Thu, Jan 28, 2016 at 5:58 PM, Pieter Hameete <[hidden email]> wrote:
Hi Till and Stephan, 

I've got some additional information. I just ran a query 60 times in a row, and all jobs ran in roughly the same time. I did this for both a simple job that requires intensive parsing and a more complex query that has several cogroups. Neither of the cases showed any slowing down. I believe this is an indication that it is not a problem with unreliable I/O or computational resources.

However, when I now run the complete set of 15 different queries again, with 3 repetitions each, things start slowing down very quickly.


I run this as follows in the GCloud: /flink run dawn-flink.jar gs://dawn-flink/data/split4G gs://dawn-flink/output/ 3 result.csv

Is there something I am doing incorrectly by running these different queries in sequence like this? I have attempted parametrizing further so I would call ./flink run for each separate query but this did not make a difference.

Cheers!

- Pieter



2016-01-28 11:28 GMT+01:00 Pieter Hameete <[hidden email]>:
Hi Till, Stephan,

Indeed I think you're right it is just a coincidence that the three jobs started producing output as soon as the first one finished. Getting back to my comment that after the upgrade to Flink 0.10.1 the job was much faster I searched a bit further. I've now noticed that after setting up a fresh GCloud cluster the job runs roughly 15x faster than the case I just showed you. Im executing many, many jobs in sequence and it seems as if performance is degrading a lot over time. Let me tell you what I'm doing exactly:

I'm running a set of 16 queries in sequence on the cluster. Each query is executed multiple (5) times in a row so I can average measurements for a more reliable estimate of performance in the GCloud. This means that I'm running a total of 60 jobs in sequence, and on 5 different datasets, totalling to 300 jobs. For each job I get the execution environment, define the query, write to an output file and run the env.execute. After each job I clear the output folder so the next job can start fresh.

Now I'm wondering what causes this performance degradation. Could this have to do with the GCloud (I surely hope they don't scale back my performance by 90% ;-) , do I need to some more cleaning and resetting on the Flink Cluster, or is my above approach asking for trouble?

Thanks again for your help!

- Pieter





2016-01-28 10:58 GMT+01:00 Till Rohrmann <[hidden email]>:
Hi Pieter,

you can see in the log that the operators are all started at the same time. However, you're right that they don't finish at the same time. The sub tasks which run on the same node exhibit a similar runtime. However, all nodes (not only hadoop-w-0 compared to the others) show different runtimes. I would guess that this is due to some other load on the GCloud machines or some other kind of asymmetry between the hosts.

Cheers,
Till

On Thu, Jan 28, 2016 at 10:17 AM, Pieter Hameete <[hidden email]> wrote:
Hi Stephen, Till,

I've watched the Job again and please see the log of the CoGroup operator:

Inline afbeelding 1

All workers get to process a fairly distributed amount of bytes and records, BUT hadoop-w-0, hadoop-w-2 and hadoop-w-3 don't start working until hadoop-w-1 is finished. Is this behavior to be expected with a CoGroup or could there still be something wrong in the distrubtion of the data?

Kind regards,

Pieter

2016-01-27 21:48 GMT+01:00 Stephan Ewen <[hidden email]>:
Hi Pieter!

Interesting, but good :-)

I don't think we did much on the hash functions since 0.9.1. I am a bit surprised that it made such a difference. Well, as long as it improves with the newer version :-)

Greetings,
Stephan


On Wed, Jan 27, 2016 at 9:42 PM, Pieter Hameete <[hidden email]> wrote:
Hi Till,

i've upgraded to Flink 0.10.1 and ran the job again without any changes to the code to see the bytes input and output of the operators and for the different workers.To my surprise it is very well balanced between all workers and because of this the job completed much faster.

Are there any changes/fixes between Flink 0.9.1 and 0.10.1 that could cause this to be better for me now?

Thanks,

Pieter

2016-01-27 14:10 GMT+01:00 Pieter Hameete <[hidden email]>:

Cheers for the quick reply Till.

That would be very useful information to have! I'll upgrade my project to Flink 0.10.1 tongiht and let you know if I can find out if theres a skew in the data :-)

- Pieter


2016-01-27 13:49 GMT+01:00 Till Rohrmann <[hidden email]>:
Could it be that your data is skewed? This could lead to different loads on different task managers.

With the latest Flink version, the web interface should show you how many bytes each operator has written and received. There you could see if one operator receives more elements than the others.

Cheers,
Till

On Wed, Jan 27, 2016 at 1:35 PM, Pieter Hameete <[hidden email]> wrote:
Hi guys,

Currently I am running a job in the GCloud in a configuration with 4 task managers that each have 4 CPUs (for a total parallelism of 16). 

However, I noticed my job is running much slower than expected and after some more investigation I found that one of the workers is doing a majority of the work (its CPU load was at 100% while the others were almost idle). 

My job execution plan can be found here: http://i.imgur.com/fHKhVFf.png

The input is split into multiple files so loading the data is properly distributed over the workers.

I am wondering if you can provide me with some tips on how to figure out what is going wrong here:
  • Could this imbalance in workload be the result of an imbalance in the hash paritioning?
    • Is there a convenient way to see how many elements each worker gets to process? Would it work to write the output of the CoGroup to disk because each worker writes to its own output file and investigate the differences?
  • Is there something strange about the execution plan that could cause this?
Thanks and kind regards,

Pieter














Reply | Threaded
Open this post in threaded view
|

Re: Imbalanced workload between workers

Pieter Hameete
Hi Stephan,

cheers for looking at this :-) The TaskManagers have a 12GB of available heapspace configured using taskmanager.heap.mb. The configurations that you mention should be default.

- Pieter



2016-02-05 18:10 GMT+01:00 Stephan Ewen <[hidden email]>:
EDIT: The by[] make up for 98% of the memory. That seems correct. 80% would mean something else is consuming a lot, which should not be the case.

On Fri, Feb 5, 2016 at 6:09 PM, Stephan Ewen <[hidden email]> wrote:
You are using Flink 0.10, right?

The byte[] are simply Flink's reserved managed memory. It is expected that they occupy a large portion of the memory. (assuming you run 0.10 is BATCH mode).
These byte[] should also not get more or fewer, so they should not cause degradation.

I can also not spot anything suspicious (in the after slowdown dumps). The fact that the byte[] make up for 80% of the memory means that nothing else can really be leaking a lot.

Did you configure a specific amount of memory (taskmanager.memory.size or taskmanager.memory.fraction) ?

Greetings,
Stephan



On Wed, Feb 3, 2016 at 10:04 AM, Pieter Hameete <[hidden email]> wrote:
Hi Stephan,

did you manage to take a look at the dumps? I have looked myself but I could not find anything that would point towards a memory leak in my user code. I have a parser that creates quite a lot of objects, but these are not abundantly present in the heap dump. I did see that the majority of the heap was consisting of byte objects (but this is normal?) as well as Akka objects in another dump.

I am not familiar enough with Flink to know what is normal to see in the heap dumps. I am looking forward to hearing what you think :-)

- Pieter

2016-01-29 13:36 GMT+01:00 Stephan Ewen <[hidden email]>:
Super, thanks a lot.

I think we need to find some time and look through this, but this should help a lot!

Greetings,
Stephan


On Fri, Jan 29, 2016 at 1:32 PM, Pieter Hameete <[hidden email]> wrote:
Hi Stephan,

cheers for your response. I have the heap dumps, you can download them here (note they are fairly large, tarballs of up to 2GB each, untarred up to 12GB)  :

Job Manager:


Task Manager 0:


Task Manager 1:


I've also attempted to reproduce the issue in a local cluster with a single 4CPU task manager, however no slowdown occurs so far.

Please let me know if there is anything else I can provide you with.

- Pieter

2016-01-28 22:18 GMT+01:00 Stephan Ewen <[hidden email]>:
Hi!

Thank you for looking into this.

Such gradual slowdown is often caused by memory leaks, which reduces available memory and makes GC more expensive.

Can you create a heap dump after the series of runs that cause the slowdown? Then we can have a look whether there are leaked Flink objects, or if the user code is causing that.

Greetings,
Stephan


On Thu, Jan 28, 2016 at 5:58 PM, Pieter Hameete <[hidden email]> wrote:
Hi Till and Stephan, 

I've got some additional information. I just ran a query 60 times in a row, and all jobs ran in roughly the same time. I did this for both a simple job that requires intensive parsing and a more complex query that has several cogroups. Neither of the cases showed any slowing down. I believe this is an indication that it is not a problem with unreliable I/O or computational resources.

However, when I now run the complete set of 15 different queries again, with 3 repetitions each, things start slowing down very quickly.


I run this as follows in the GCloud: /flink run dawn-flink.jar gs://dawn-flink/data/split4G gs://dawn-flink/output/ 3 result.csv

Is there something I am doing incorrectly by running these different queries in sequence like this? I have attempted parametrizing further so I would call ./flink run for each separate query but this did not make a difference.

Cheers!

- Pieter



2016-01-28 11:28 GMT+01:00 Pieter Hameete <[hidden email]>:
Hi Till, Stephan,

Indeed I think you're right it is just a coincidence that the three jobs started producing output as soon as the first one finished. Getting back to my comment that after the upgrade to Flink 0.10.1 the job was much faster I searched a bit further. I've now noticed that after setting up a fresh GCloud cluster the job runs roughly 15x faster than the case I just showed you. Im executing many, many jobs in sequence and it seems as if performance is degrading a lot over time. Let me tell you what I'm doing exactly:

I'm running a set of 16 queries in sequence on the cluster. Each query is executed multiple (5) times in a row so I can average measurements for a more reliable estimate of performance in the GCloud. This means that I'm running a total of 60 jobs in sequence, and on 5 different datasets, totalling to 300 jobs. For each job I get the execution environment, define the query, write to an output file and run the env.execute. After each job I clear the output folder so the next job can start fresh.

Now I'm wondering what causes this performance degradation. Could this have to do with the GCloud (I surely hope they don't scale back my performance by 90% ;-) , do I need to some more cleaning and resetting on the Flink Cluster, or is my above approach asking for trouble?

Thanks again for your help!

- Pieter





2016-01-28 10:58 GMT+01:00 Till Rohrmann <[hidden email]>:
Hi Pieter,

you can see in the log that the operators are all started at the same time. However, you're right that they don't finish at the same time. The sub tasks which run on the same node exhibit a similar runtime. However, all nodes (not only hadoop-w-0 compared to the others) show different runtimes. I would guess that this is due to some other load on the GCloud machines or some other kind of asymmetry between the hosts.

Cheers,
Till

On Thu, Jan 28, 2016 at 10:17 AM, Pieter Hameete <[hidden email]> wrote:
Hi Stephen, Till,

I've watched the Job again and please see the log of the CoGroup operator:

Inline afbeelding 1

All workers get to process a fairly distributed amount of bytes and records, BUT hadoop-w-0, hadoop-w-2 and hadoop-w-3 don't start working until hadoop-w-1 is finished. Is this behavior to be expected with a CoGroup or could there still be something wrong in the distrubtion of the data?

Kind regards,

Pieter

2016-01-27 21:48 GMT+01:00 Stephan Ewen <[hidden email]>:
Hi Pieter!

Interesting, but good :-)

I don't think we did much on the hash functions since 0.9.1. I am a bit surprised that it made such a difference. Well, as long as it improves with the newer version :-)

Greetings,
Stephan


On Wed, Jan 27, 2016 at 9:42 PM, Pieter Hameete <[hidden email]> wrote:
Hi Till,

i've upgraded to Flink 0.10.1 and ran the job again without any changes to the code to see the bytes input and output of the operators and for the different workers.To my surprise it is very well balanced between all workers and because of this the job completed much faster.

Are there any changes/fixes between Flink 0.9.1 and 0.10.1 that could cause this to be better for me now?

Thanks,

Pieter

2016-01-27 14:10 GMT+01:00 Pieter Hameete <[hidden email]>:

Cheers for the quick reply Till.

That would be very useful information to have! I'll upgrade my project to Flink 0.10.1 tongiht and let you know if I can find out if theres a skew in the data :-)

- Pieter


2016-01-27 13:49 GMT+01:00 Till Rohrmann <[hidden email]>:
Could it be that your data is skewed? This could lead to different loads on different task managers.

With the latest Flink version, the web interface should show you how many bytes each operator has written and received. There you could see if one operator receives more elements than the others.

Cheers,
Till

On Wed, Jan 27, 2016 at 1:35 PM, Pieter Hameete <[hidden email]> wrote:
Hi guys,

Currently I am running a job in the GCloud in a configuration with 4 task managers that each have 4 CPUs (for a total parallelism of 16). 

However, I noticed my job is running much slower than expected and after some more investigation I found that one of the workers is doing a majority of the work (its CPU load was at 100% while the others were almost idle). 

My job execution plan can be found here: http://i.imgur.com/fHKhVFf.png

The input is split into multiple files so loading the data is properly distributed over the workers.

I am wondering if you can provide me with some tips on how to figure out what is going wrong here:
  • Could this imbalance in workload be the result of an imbalance in the hash paritioning?
    • Is there a convenient way to see how many elements each worker gets to process? Would it work to write the output of the CoGroup to disk because each worker writes to its own output file and investigate the differences?
  • Is there something strange about the execution plan that could cause this?
Thanks and kind regards,

Pieter















Reply | Threaded
Open this post in threaded view
|

Re: Imbalanced workload between workers

Stephan Ewen
Hi!

Then the heap dump looks actually non-suspicious. The 8.x GB of managed memory corresponds to what is expected.

If there is no memory leak, what could be the reason?

  - Thread leaks? You can try and create thread dumps (via jstack) after the slowdown (between invocations of the program).

  - Is the JITer going into weird behavior, in the end not compiling as aggressively? Maybe classes do not get properly unloaded, or the code cache (where JITted assembler code is stored) flows over.

Can you check at the beginning of the log files, what garbage collectors are set? Is classunloading is actvated?

Greetings,
Stephan



On Fri, Feb 5, 2016 at 6:14 PM, Pieter Hameete <[hidden email]> wrote:
Hi Stephan,

cheers for looking at this :-) The TaskManagers have a 12GB of available heapspace configured using taskmanager.heap.mb. The configurations that you mention should be default.

- Pieter



2016-02-05 18:10 GMT+01:00 Stephan Ewen <[hidden email]>:
EDIT: The by[] make up for 98% of the memory. That seems correct. 80% would mean something else is consuming a lot, which should not be the case.

On Fri, Feb 5, 2016 at 6:09 PM, Stephan Ewen <[hidden email]> wrote:
You are using Flink 0.10, right?

The byte[] are simply Flink's reserved managed memory. It is expected that they occupy a large portion of the memory. (assuming you run 0.10 is BATCH mode).
These byte[] should also not get more or fewer, so they should not cause degradation.

I can also not spot anything suspicious (in the after slowdown dumps). The fact that the byte[] make up for 80% of the memory means that nothing else can really be leaking a lot.

Did you configure a specific amount of memory (taskmanager.memory.size or taskmanager.memory.fraction) ?

Greetings,
Stephan



On Wed, Feb 3, 2016 at 10:04 AM, Pieter Hameete <[hidden email]> wrote:
Hi Stephan,

did you manage to take a look at the dumps? I have looked myself but I could not find anything that would point towards a memory leak in my user code. I have a parser that creates quite a lot of objects, but these are not abundantly present in the heap dump. I did see that the majority of the heap was consisting of byte objects (but this is normal?) as well as Akka objects in another dump.

I am not familiar enough with Flink to know what is normal to see in the heap dumps. I am looking forward to hearing what you think :-)

- Pieter

2016-01-29 13:36 GMT+01:00 Stephan Ewen <[hidden email]>:
Super, thanks a lot.

I think we need to find some time and look through this, but this should help a lot!

Greetings,
Stephan


On Fri, Jan 29, 2016 at 1:32 PM, Pieter Hameete <[hidden email]> wrote:
Hi Stephan,

cheers for your response. I have the heap dumps, you can download them here (note they are fairly large, tarballs of up to 2GB each, untarred up to 12GB)  :

Job Manager:


Task Manager 0:


Task Manager 1:


I've also attempted to reproduce the issue in a local cluster with a single 4CPU task manager, however no slowdown occurs so far.

Please let me know if there is anything else I can provide you with.

- Pieter

2016-01-28 22:18 GMT+01:00 Stephan Ewen <[hidden email]>:
Hi!

Thank you for looking into this.

Such gradual slowdown is often caused by memory leaks, which reduces available memory and makes GC more expensive.

Can you create a heap dump after the series of runs that cause the slowdown? Then we can have a look whether there are leaked Flink objects, or if the user code is causing that.

Greetings,
Stephan


On Thu, Jan 28, 2016 at 5:58 PM, Pieter Hameete <[hidden email]> wrote:
Hi Till and Stephan, 

I've got some additional information. I just ran a query 60 times in a row, and all jobs ran in roughly the same time. I did this for both a simple job that requires intensive parsing and a more complex query that has several cogroups. Neither of the cases showed any slowing down. I believe this is an indication that it is not a problem with unreliable I/O or computational resources.

However, when I now run the complete set of 15 different queries again, with 3 repetitions each, things start slowing down very quickly.


I run this as follows in the GCloud: /flink run dawn-flink.jar gs://dawn-flink/data/split4G gs://dawn-flink/output/ 3 result.csv

Is there something I am doing incorrectly by running these different queries in sequence like this? I have attempted parametrizing further so I would call ./flink run for each separate query but this did not make a difference.

Cheers!

- Pieter



2016-01-28 11:28 GMT+01:00 Pieter Hameete <[hidden email]>:
Hi Till, Stephan,

Indeed I think you're right it is just a coincidence that the three jobs started producing output as soon as the first one finished. Getting back to my comment that after the upgrade to Flink 0.10.1 the job was much faster I searched a bit further. I've now noticed that after setting up a fresh GCloud cluster the job runs roughly 15x faster than the case I just showed you. Im executing many, many jobs in sequence and it seems as if performance is degrading a lot over time. Let me tell you what I'm doing exactly:

I'm running a set of 16 queries in sequence on the cluster. Each query is executed multiple (5) times in a row so I can average measurements for a more reliable estimate of performance in the GCloud. This means that I'm running a total of 60 jobs in sequence, and on 5 different datasets, totalling to 300 jobs. For each job I get the execution environment, define the query, write to an output file and run the env.execute. After each job I clear the output folder so the next job can start fresh.

Now I'm wondering what causes this performance degradation. Could this have to do with the GCloud (I surely hope they don't scale back my performance by 90% ;-) , do I need to some more cleaning and resetting on the Flink Cluster, or is my above approach asking for trouble?

Thanks again for your help!

- Pieter





2016-01-28 10:58 GMT+01:00 Till Rohrmann <[hidden email]>:
Hi Pieter,

you can see in the log that the operators are all started at the same time. However, you're right that they don't finish at the same time. The sub tasks which run on the same node exhibit a similar runtime. However, all nodes (not only hadoop-w-0 compared to the others) show different runtimes. I would guess that this is due to some other load on the GCloud machines or some other kind of asymmetry between the hosts.

Cheers,
Till

On Thu, Jan 28, 2016 at 10:17 AM, Pieter Hameete <[hidden email]> wrote:
Hi Stephen, Till,

I've watched the Job again and please see the log of the CoGroup operator:

Inline afbeelding 1

All workers get to process a fairly distributed amount of bytes and records, BUT hadoop-w-0, hadoop-w-2 and hadoop-w-3 don't start working until hadoop-w-1 is finished. Is this behavior to be expected with a CoGroup or could there still be something wrong in the distrubtion of the data?

Kind regards,

Pieter

2016-01-27 21:48 GMT+01:00 Stephan Ewen <[hidden email]>:
Hi Pieter!

Interesting, but good :-)

I don't think we did much on the hash functions since 0.9.1. I am a bit surprised that it made such a difference. Well, as long as it improves with the newer version :-)

Greetings,
Stephan


On Wed, Jan 27, 2016 at 9:42 PM, Pieter Hameete <[hidden email]> wrote:
Hi Till,

i've upgraded to Flink 0.10.1 and ran the job again without any changes to the code to see the bytes input and output of the operators and for the different workers.To my surprise it is very well balanced between all workers and because of this the job completed much faster.

Are there any changes/fixes between Flink 0.9.1 and 0.10.1 that could cause this to be better for me now?

Thanks,

Pieter

2016-01-27 14:10 GMT+01:00 Pieter Hameete <[hidden email]>:

Cheers for the quick reply Till.

That would be very useful information to have! I'll upgrade my project to Flink 0.10.1 tongiht and let you know if I can find out if theres a skew in the data :-)

- Pieter


2016-01-27 13:49 GMT+01:00 Till Rohrmann <[hidden email]>:
Could it be that your data is skewed? This could lead to different loads on different task managers.

With the latest Flink version, the web interface should show you how many bytes each operator has written and received. There you could see if one operator receives more elements than the others.

Cheers,
Till

On Wed, Jan 27, 2016 at 1:35 PM, Pieter Hameete <[hidden email]> wrote:
Hi guys,

Currently I am running a job in the GCloud in a configuration with 4 task managers that each have 4 CPUs (for a total parallelism of 16). 

However, I noticed my job is running much slower than expected and after some more investigation I found that one of the workers is doing a majority of the work (its CPU load was at 100% while the others were almost idle). 

My job execution plan can be found here: http://i.imgur.com/fHKhVFf.png

The input is split into multiple files so loading the data is properly distributed over the workers.

I am wondering if you can provide me with some tips on how to figure out what is going wrong here:
  • Could this imbalance in workload be the result of an imbalance in the hash paritioning?
    • Is there a convenient way to see how many elements each worker gets to process? Would it work to write the output of the CoGroup to disk because each worker writes to its own output file and investigate the differences?
  • Is there something strange about the execution plan that could cause this?
Thanks and kind regards,

Pieter
















Reply | Threaded
Open this post in threaded view
|

Re: Imbalanced workload between workers

Pieter Hameete
Hi Stephan,

Thank you for your time looking into this. I might be able to check those things later if you believe it would be interesting to find the source of this problem.

For me the Google Cloud is getting too expensive to use right now. Instead I found a YARN cluster that I can use for free and I'm hoping that getting fresh containers for each job will be a good workaround until then.

Cheers,

- Pieter



2016-02-06 15:12 GMT+01:00 Stephan Ewen <[hidden email]>:
Hi!

Then the heap dump looks actually non-suspicious. The 8.x GB of managed memory corresponds to what is expected.

If there is no memory leak, what could be the reason?

  - Thread leaks? You can try and create thread dumps (via jstack) after the slowdown (between invocations of the program).

  - Is the JITer going into weird behavior, in the end not compiling as aggressively? Maybe classes do not get properly unloaded, or the code cache (where JITted assembler code is stored) flows over.

Can you check at the beginning of the log files, what garbage collectors are set? Is classunloading is actvated?

Greetings,
Stephan



On Fri, Feb 5, 2016 at 6:14 PM, Pieter Hameete <[hidden email]> wrote:
Hi Stephan,

cheers for looking at this :-) The TaskManagers have a 12GB of available heapspace configured using taskmanager.heap.mb. The configurations that you mention should be default.

- Pieter



2016-02-05 18:10 GMT+01:00 Stephan Ewen <[hidden email]>:
EDIT: The by[] make up for 98% of the memory. That seems correct. 80% would mean something else is consuming a lot, which should not be the case.

On Fri, Feb 5, 2016 at 6:09 PM, Stephan Ewen <[hidden email]> wrote:
You are using Flink 0.10, right?

The byte[] are simply Flink's reserved managed memory. It is expected that they occupy a large portion of the memory. (assuming you run 0.10 is BATCH mode).
These byte[] should also not get more or fewer, so they should not cause degradation.

I can also not spot anything suspicious (in the after slowdown dumps). The fact that the byte[] make up for 80% of the memory means that nothing else can really be leaking a lot.

Did you configure a specific amount of memory (taskmanager.memory.size or taskmanager.memory.fraction) ?

Greetings,
Stephan



On Wed, Feb 3, 2016 at 10:04 AM, Pieter Hameete <[hidden email]> wrote:
Hi Stephan,

did you manage to take a look at the dumps? I have looked myself but I could not find anything that would point towards a memory leak in my user code. I have a parser that creates quite a lot of objects, but these are not abundantly present in the heap dump. I did see that the majority of the heap was consisting of byte objects (but this is normal?) as well as Akka objects in another dump.

I am not familiar enough with Flink to know what is normal to see in the heap dumps. I am looking forward to hearing what you think :-)

- Pieter

2016-01-29 13:36 GMT+01:00 Stephan Ewen <[hidden email]>:
Super, thanks a lot.

I think we need to find some time and look through this, but this should help a lot!

Greetings,
Stephan


On Fri, Jan 29, 2016 at 1:32 PM, Pieter Hameete <[hidden email]> wrote:
Hi Stephan,

cheers for your response. I have the heap dumps, you can download them here (note they are fairly large, tarballs of up to 2GB each, untarred up to 12GB)  :

Job Manager:


Task Manager 0:


Task Manager 1:


I've also attempted to reproduce the issue in a local cluster with a single 4CPU task manager, however no slowdown occurs so far.

Please let me know if there is anything else I can provide you with.

- Pieter

2016-01-28 22:18 GMT+01:00 Stephan Ewen <[hidden email]>:
Hi!

Thank you for looking into this.

Such gradual slowdown is often caused by memory leaks, which reduces available memory and makes GC more expensive.

Can you create a heap dump after the series of runs that cause the slowdown? Then we can have a look whether there are leaked Flink objects, or if the user code is causing that.

Greetings,
Stephan


On Thu, Jan 28, 2016 at 5:58 PM, Pieter Hameete <[hidden email]> wrote:
Hi Till and Stephan, 

I've got some additional information. I just ran a query 60 times in a row, and all jobs ran in roughly the same time. I did this for both a simple job that requires intensive parsing and a more complex query that has several cogroups. Neither of the cases showed any slowing down. I believe this is an indication that it is not a problem with unreliable I/O or computational resources.

However, when I now run the complete set of 15 different queries again, with 3 repetitions each, things start slowing down very quickly.


I run this as follows in the GCloud: /flink run dawn-flink.jar gs://dawn-flink/data/split4G gs://dawn-flink/output/ 3 result.csv

Is there something I am doing incorrectly by running these different queries in sequence like this? I have attempted parametrizing further so I would call ./flink run for each separate query but this did not make a difference.

Cheers!

- Pieter



2016-01-28 11:28 GMT+01:00 Pieter Hameete <[hidden email]>:
Hi Till, Stephan,

Indeed I think you're right it is just a coincidence that the three jobs started producing output as soon as the first one finished. Getting back to my comment that after the upgrade to Flink 0.10.1 the job was much faster I searched a bit further. I've now noticed that after setting up a fresh GCloud cluster the job runs roughly 15x faster than the case I just showed you. Im executing many, many jobs in sequence and it seems as if performance is degrading a lot over time. Let me tell you what I'm doing exactly:

I'm running a set of 16 queries in sequence on the cluster. Each query is executed multiple (5) times in a row so I can average measurements for a more reliable estimate of performance in the GCloud. This means that I'm running a total of 60 jobs in sequence, and on 5 different datasets, totalling to 300 jobs. For each job I get the execution environment, define the query, write to an output file and run the env.execute. After each job I clear the output folder so the next job can start fresh.

Now I'm wondering what causes this performance degradation. Could this have to do with the GCloud (I surely hope they don't scale back my performance by 90% ;-) , do I need to some more cleaning and resetting on the Flink Cluster, or is my above approach asking for trouble?

Thanks again for your help!

- Pieter





2016-01-28 10:58 GMT+01:00 Till Rohrmann <[hidden email]>:
Hi Pieter,

you can see in the log that the operators are all started at the same time. However, you're right that they don't finish at the same time. The sub tasks which run on the same node exhibit a similar runtime. However, all nodes (not only hadoop-w-0 compared to the others) show different runtimes. I would guess that this is due to some other load on the GCloud machines or some other kind of asymmetry between the hosts.

Cheers,
Till

On Thu, Jan 28, 2016 at 10:17 AM, Pieter Hameete <[hidden email]> wrote:
Hi Stephen, Till,

I've watched the Job again and please see the log of the CoGroup operator:

Inline afbeelding 1

All workers get to process a fairly distributed amount of bytes and records, BUT hadoop-w-0, hadoop-w-2 and hadoop-w-3 don't start working until hadoop-w-1 is finished. Is this behavior to be expected with a CoGroup or could there still be something wrong in the distrubtion of the data?

Kind regards,

Pieter

2016-01-27 21:48 GMT+01:00 Stephan Ewen <[hidden email]>:
Hi Pieter!

Interesting, but good :-)

I don't think we did much on the hash functions since 0.9.1. I am a bit surprised that it made such a difference. Well, as long as it improves with the newer version :-)

Greetings,
Stephan


On Wed, Jan 27, 2016 at 9:42 PM, Pieter Hameete <[hidden email]> wrote:
Hi Till,

i've upgraded to Flink 0.10.1 and ran the job again without any changes to the code to see the bytes input and output of the operators and for the different workers.To my surprise it is very well balanced between all workers and because of this the job completed much faster.

Are there any changes/fixes between Flink 0.9.1 and 0.10.1 that could cause this to be better for me now?

Thanks,

Pieter

2016-01-27 14:10 GMT+01:00 Pieter Hameete <[hidden email]>:

Cheers for the quick reply Till.

That would be very useful information to have! I'll upgrade my project to Flink 0.10.1 tongiht and let you know if I can find out if theres a skew in the data :-)

- Pieter


2016-01-27 13:49 GMT+01:00 Till Rohrmann <[hidden email]>:
Could it be that your data is skewed? This could lead to different loads on different task managers.

With the latest Flink version, the web interface should show you how many bytes each operator has written and received. There you could see if one operator receives more elements than the others.

Cheers,
Till

On Wed, Jan 27, 2016 at 1:35 PM, Pieter Hameete <[hidden email]> wrote:
Hi guys,

Currently I am running a job in the GCloud in a configuration with 4 task managers that each have 4 CPUs (for a total parallelism of 16). 

However, I noticed my job is running much slower than expected and after some more investigation I found that one of the workers is doing a majority of the work (its CPU load was at 100% while the others were almost idle). 

My job execution plan can be found here: http://i.imgur.com/fHKhVFf.png

The input is split into multiple files so loading the data is properly distributed over the workers.

I am wondering if you can provide me with some tips on how to figure out what is going wrong here:
  • Could this imbalance in workload be the result of an imbalance in the hash paritioning?
    • Is there a convenient way to see how many elements each worker gets to process? Would it work to write the output of the CoGroup to disk because each worker writes to its own output file and investigate the differences?
  • Is there something strange about the execution plan that could cause this?
Thanks and kind regards,

Pieter

















12