Backpressure tuning/failure

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Backpressure tuning/failure

Owen Rees-Hayward
Hi,

I am having a few issues with the Flink (v1.8.1) backpressure default settings, which lead to poor throughput in a comparison I am doing between Storm, Spark and Flink.

I have a setup that simulates a progressively worse straggling task that Storm and Spark cope with the relatively well. Flink not so much. Code can be found here - https://github.com/owenrh/flink-variance.

See this throughput chart for the an idea of how badly - https://owenrh.me.uk/assets/images/blog/smackdown/flink-constant-straggler.png

I do not have any production experience with Flink, but I have had a look at the Flink docs and there is nothing in there that jumps out at me to explain or address this. I presume I am missing something, as I cannot believe Flink is this weak in the face of stragglers. It must be configuration right?

Would appreciate any help on this. I've got a draft blog post that I will publish in a day or two, and don't want to criticise the Flink backpressure implementation for what seems most likely some default configuration issue.

Thanks in advance, Owen

--
Owen Rees-Hayward
07912 876046
twitter.com/owen4d
Reply | Threaded
Open this post in threaded view
|

Re: Backpressure tuning/failure

Piotr Nowojski-3
Hi,

I’m not entirely sure what you are testing. I have looked at your code (only the constant straggler scenario) and please correct me if’m wrong, in your job you are basically measuring throughput of `Thread.sleep(straggler.waitMillis)`.

In the first RichMap task (`subTaskId == 0`), per every record you do the sleep(50ms), so after filling in all of the network buffers  your whole job will be bottlenecked by this throughput cap of 20 records / second. Every so often when this struggling task will be able to process and free up some buffer from the backlog. This briefly unblocks other three tasks (which are capped at 133 records / second). Apart from those short stints, those other tasks can not process constant 133 records / seconds, because records are evenly distributed by the source between all of those tasks. Which is I think clearly visible on the charts and every system would behave in exactly the same way.

But what scenario are you really trying to simulate? 

A data skew when one task is 6.65 (133 / 20 ) times more overloaded/processing heavier records than the others? Yes, this is expected behaviour, but your benchmark is testing this in a bit convoluted way.

A failing machine which has 6.65 times less performance? With keyed network exchanges there is again very little that you can do (except of the speculative execution). Without keyed network exchanges, OK, I agree. In this case, randomly/evenly distributing the records is not the optimal shuffling strategy and there is some room for the improvement in Flink (we could distribute records not randomly but to the less busy machines). However this is a pretty much niche feature (failing machine + non keyed exchanges) and you are not saying anywhere that this is what you are testing for.

Piotrek

On 8 Oct 2019, at 18:10, Owen Rees-Hayward <[hidden email]> wrote:

Hi,

I am having a few issues with the Flink (v1.8.1) backpressure default settings, which lead to poor throughput in a comparison I am doing between Storm, Spark and Flink.

I have a setup that simulates a progressively worse straggling task that Storm and Spark cope with the relatively well. Flink not so much. Code can be found here - https://github.com/owenrh/flink-variance.

See this throughput chart for the an idea of how badly - https://owenrh.me.uk/assets/images/blog/smackdown/flink-constant-straggler.png

I do not have any production experience with Flink, but I have had a look at the Flink docs and there is nothing in there that jumps out at me to explain or address this. I presume I am missing something, as I cannot believe Flink is this weak in the face of stragglers. It must be configuration right?

Would appreciate any help on this. I've got a draft blog post that I will publish in a day or two, and don't want to criticise the Flink backpressure implementation for what seems most likely some default configuration issue.

Thanks in advance, Owen

--
Owen Rees-Hayward
07912 876046
twitter.com/owen4d

Reply | Threaded
Open this post in threaded view
|

Re: Backpressure tuning/failure

Owen Rees-Hayward
Hi Piotr,

Thanks for getting back to me and for the info. I try to describe the motivation around the scenarios in the original post in the series - see the 'Backpressure - why you might care' section on http://owenrh.me.uk/blog/2019/09/30/. Maybe it could have been clearer.

As you note, this will not affect every Flink job. However, one persons niche is another persons day job. I definitely agree that keyed network exchanges, which is going to the majority of analytics queries, are in a different problem space. However, this is not an uncommon scenario in ingest pipelines.

I'd be interested to know whether you saw the section in the post I referred to above and whether this clears anything up? To clarify, the code is attempting to simulate a straggler node due to high load, which therefore processes data at a slower rate - not a failing node. Some degree of this is a feature of multi-tenant Hadoop. 

Cheers, Owen

On Thu, 10 Oct 2019 at 10:27, Piotr Nowojski <[hidden email]> wrote:
Hi,

I’m not entirely sure what you are testing. I have looked at your code (only the constant straggler scenario) and please correct me if’m wrong, in your job you are basically measuring throughput of `Thread.sleep(straggler.waitMillis)`.

In the first RichMap task (`subTaskId == 0`), per every record you do the sleep(50ms), so after filling in all of the network buffers  your whole job will be bottlenecked by this throughput cap of 20 records / second. Every so often when this struggling task will be able to process and free up some buffer from the backlog. This briefly unblocks other three tasks (which are capped at 133 records / second). Apart from those short stints, those other tasks can not process constant 133 records / seconds, because records are evenly distributed by the source between all of those tasks. Which is I think clearly visible on the charts and every system would behave in exactly the same way.

But what scenario are you really trying to simulate? 

A data skew when one task is 6.65 (133 / 20 ) times more overloaded/processing heavier records than the others? Yes, this is expected behaviour, but your benchmark is testing this in a bit convoluted way.

A failing machine which has 6.65 times less performance? With keyed network exchanges there is again very little that you can do (except of the speculative execution). Without keyed network exchanges, OK, I agree. In this case, randomly/evenly distributing the records is not the optimal shuffling strategy and there is some room for the improvement in Flink (we could distribute records not randomly but to the less busy machines). However this is a pretty much niche feature (failing machine + non keyed exchanges) and you are not saying anywhere that this is what you are testing for.

Piotrek

On 8 Oct 2019, at 18:10, Owen Rees-Hayward <[hidden email]> wrote:

Hi,

I am having a few issues with the Flink (v1.8.1) backpressure default settings, which lead to poor throughput in a comparison I am doing between Storm, Spark and Flink.

I have a setup that simulates a progressively worse straggling task that Storm and Spark cope with the relatively well. Flink not so much. Code can be found here - https://github.com/owenrh/flink-variance.

See this throughput chart for the an idea of how badly - https://owenrh.me.uk/assets/images/blog/smackdown/flink-constant-straggler.png

I do not have any production experience with Flink, but I have had a look at the Flink docs and there is nothing in there that jumps out at me to explain or address this. I presume I am missing something, as I cannot believe Flink is this weak in the face of stragglers. It must be configuration right?

Would appreciate any help on this. I've got a draft blog post that I will publish in a day or two, and don't want to criticise the Flink backpressure implementation for what seems most likely some default configuration issue.

Thanks in advance, Owen

--
Owen Rees-Hayward
07912 876046
twitter.com/owen4d



--
Owen Rees-Hayward
07912 876046
twitter.com/owen4d
Reply | Threaded
Open this post in threaded view
|

Re: Backpressure tuning/failure

Piotr Nowojski-3
Hi Owen,

Thanks for the quick response. No, I haven’t seen the previous blog post, yes it clears the things out a bit. 

To clarify, the code is attempting to simulate a straggler node due to high load, which therefore processes data at a slower rate - not a failing node. Some degree of this is a feature of multi-tenant Hadoop. 

In your benchmark you are manually slowing down just one TaskManager, so you are testing for the failing/slow machine case, where either:
1. the machine is slow on it’s own because it’s smaller than the others,
2. it’s overloaded by some service independent of the Flink 
3. it's a failing node. 

Out of those three options, first two are not supported by Flink, in a sense that Flink assumes more or less equal machines in the cluster. The third is, as I wrote in the previous response, pretty uncommon scenario (until you reach really huge scale). How often one of your machine fails in a way that it is 6.6 times slower than the others? I agree Flink doesn’t handle this automatically at the moment (currently you would be expected to manually shut down the machine). Nevertheless there are some plans how to address this (speculative execution and load based balancing channel selection), but with no definite schedule.

Also if the issue is "multi-tenant Hadoop.”, I would first try to better assign resources in the cluster, using for example CGroups via yarn/lxc/docker, or virtual machines.

Cheers, Piotrek

On 10 Oct 2019, at 16:02, Owen Rees-Hayward <[hidden email]> wrote:

Hi Piotr,

Thanks for getting back to me and for the info. I try to describe the motivation around the scenarios in the original post in the series - see the 'Backpressure - why you might care' section on http://owenrh.me.uk/blog/2019/09/30/. Maybe it could have been clearer.

As you note, this will not affect every Flink job. However, one persons niche is another persons day job. I definitely agree that keyed network exchanges, which is going to the majority of analytics queries, are in a different problem space. However, this is not an uncommon scenario in ingest pipelines.

I'd be interested to know whether you saw the section in the post I referred to above and whether this clears anything up? To clarify, the code is attempting to simulate a straggler node due to high load, which therefore processes data at a slower rate - not a failing node. Some degree of this is a feature of multi-tenant Hadoop. 

Cheers, Owen

On Thu, 10 Oct 2019 at 10:27, Piotr Nowojski <[hidden email]> wrote:
Hi,

I’m not entirely sure what you are testing. I have looked at your code (only the constant straggler scenario) and please correct me if’m wrong, in your job you are basically measuring throughput of `Thread.sleep(straggler.waitMillis)`.

In the first RichMap task (`subTaskId == 0`), per every record you do the sleep(50ms), so after filling in all of the network buffers  your whole job will be bottlenecked by this throughput cap of 20 records / second. Every so often when this struggling task will be able to process and free up some buffer from the backlog. This briefly unblocks other three tasks (which are capped at 133 records / second). Apart from those short stints, those other tasks can not process constant 133 records / seconds, because records are evenly distributed by the source between all of those tasks. Which is I think clearly visible on the charts and every system would behave in exactly the same way.

But what scenario are you really trying to simulate? 

A data skew when one task is 6.65 (133 / 20 ) times more overloaded/processing heavier records than the others? Yes, this is expected behaviour, but your benchmark is testing this in a bit convoluted way.

A failing machine which has 6.65 times less performance? With keyed network exchanges there is again very little that you can do (except of the speculative execution). Without keyed network exchanges, OK, I agree. In this case, randomly/evenly distributing the records is not the optimal shuffling strategy and there is some room for the improvement in Flink (we could distribute records not randomly but to the less busy machines). However this is a pretty much niche feature (failing machine + non keyed exchanges) and you are not saying anywhere that this is what you are testing for.

Piotrek

On 8 Oct 2019, at 18:10, Owen Rees-Hayward <[hidden email]> wrote:

Hi,

I am having a few issues with the Flink (v1.8.1) backpressure default settings, which lead to poor throughput in a comparison I am doing between Storm, Spark and Flink.

I have a setup that simulates a progressively worse straggling task that Storm and Spark cope with the relatively well. Flink not so much. Code can be found here - https://github.com/owenrh/flink-variance.

See this throughput chart for the an idea of how badly - https://owenrh.me.uk/assets/images/blog/smackdown/flink-constant-straggler.png

I do not have any production experience with Flink, but I have had a look at the Flink docs and there is nothing in there that jumps out at me to explain or address this. I presume I am missing something, as I cannot believe Flink is this weak in the face of stragglers. It must be configuration right?

Would appreciate any help on this. I've got a draft blog post that I will publish in a day or two, and don't want to criticise the Flink backpressure implementation for what seems most likely some default configuration issue.

Thanks in advance, Owen

--
Owen Rees-Hayward
07912 876046
twitter.com/owen4d



--
Owen Rees-Hayward
07912 876046
twitter.com/owen4d

Reply | Threaded
Open this post in threaded view
|

Re: Backpressure tuning/failure

Owen Rees-Hayward
Hey Piotr,

I think we are broadly in agreement, hopefully.

So out of the three scenarios you describe, the code is simulating scenario 2). The only additional comment I would make to this is that the additional load on a node could be an independent service or job. 

I am guessing we can agree, that in the context of multi-tenant Hadoop, this is quite common? For instance, assuming Flink is deployed on the datanodes then I could see the following as a few examples:
  • another tenant runs a heavy batch job that overlaps with our streaming datanodes
  • someone runs a juicy adhoc Hive query which overlaps with our datanodes
  • HBase performs compaction or replica movement on some of our datanodes
Now in an ideal world, I might have a dedicated cluster or be deployed in the cloud. Then I have an easier life. However, there are lots of data-engineers operating in challenging multi-tenant Hadoop environments, where life is not so easy : o 
 
You stated that Flink does not support scenario 2. Typically, Spark is deployed onto the datanodes for data-locality. I had assumed the same would be true for Flink. Is that assumption incorrect?

Cheers, Owen

On Thu, 10 Oct 2019 at 15:23, Piotr Nowojski <[hidden email]> wrote:
Hi Owen,

Thanks for the quick response. No, I haven’t seen the previous blog post, yes it clears the things out a bit. 

To clarify, the code is attempting to simulate a straggler node due to high load, which therefore processes data at a slower rate - not a failing node. Some degree of this is a feature of multi-tenant Hadoop. 

In your benchmark you are manually slowing down just one TaskManager, so you are testing for the failing/slow machine case, where either:
1. the machine is slow on it’s own because it’s smaller than the others,
2. it’s overloaded by some service independent of the Flink 
3. it's a failing node. 

Out of those three options, first two are not supported by Flink, in a sense that Flink assumes more or less equal machines in the cluster. The third is, as I wrote in the previous response, pretty uncommon scenario (until you reach really huge scale). How often one of your machine fails in a way that it is 6.6 times slower than the others? I agree Flink doesn’t handle this automatically at the moment (currently you would be expected to manually shut down the machine). Nevertheless there are some plans how to address this (speculative execution and load based balancing channel selection), but with no definite schedule.

Also if the issue is "multi-tenant Hadoop.”, I would first try to better assign resources in the cluster, using for example CGroups via yarn/lxc/docker, or virtual machines.

Cheers, Piotrek

On 10 Oct 2019, at 16:02, Owen Rees-Hayward <[hidden email]> wrote:

Hi Piotr,

Thanks for getting back to me and for the info. I try to describe the motivation around the scenarios in the original post in the series - see the 'Backpressure - why you might care' section on http://owenrh.me.uk/blog/2019/09/30/. Maybe it could have been clearer.

As you note, this will not affect every Flink job. However, one persons niche is another persons day job. I definitely agree that keyed network exchanges, which is going to the majority of analytics queries, are in a different problem space. However, this is not an uncommon scenario in ingest pipelines.

I'd be interested to know whether you saw the section in the post I referred to above and whether this clears anything up? To clarify, the code is attempting to simulate a straggler node due to high load, which therefore processes data at a slower rate - not a failing node. Some degree of this is a feature of multi-tenant Hadoop. 

Cheers, Owen

On Thu, 10 Oct 2019 at 10:27, Piotr Nowojski <[hidden email]> wrote:
Hi,

I’m not entirely sure what you are testing. I have looked at your code (only the constant straggler scenario) and please correct me if’m wrong, in your job you are basically measuring throughput of `Thread.sleep(straggler.waitMillis)`.

In the first RichMap task (`subTaskId == 0`), per every record you do the sleep(50ms), so after filling in all of the network buffers  your whole job will be bottlenecked by this throughput cap of 20 records / second. Every so often when this struggling task will be able to process and free up some buffer from the backlog. This briefly unblocks other three tasks (which are capped at 133 records / second). Apart from those short stints, those other tasks can not process constant 133 records / seconds, because records are evenly distributed by the source between all of those tasks. Which is I think clearly visible on the charts and every system would behave in exactly the same way.

But what scenario are you really trying to simulate? 

A data skew when one task is 6.65 (133 / 20 ) times more overloaded/processing heavier records than the others? Yes, this is expected behaviour, but your benchmark is testing this in a bit convoluted way.

A failing machine which has 6.65 times less performance? With keyed network exchanges there is again very little that you can do (except of the speculative execution). Without keyed network exchanges, OK, I agree. In this case, randomly/evenly distributing the records is not the optimal shuffling strategy and there is some room for the improvement in Flink (we could distribute records not randomly but to the less busy machines). However this is a pretty much niche feature (failing machine + non keyed exchanges) and you are not saying anywhere that this is what you are testing for.

Piotrek

On 8 Oct 2019, at 18:10, Owen Rees-Hayward <[hidden email]> wrote:

Hi,

I am having a few issues with the Flink (v1.8.1) backpressure default settings, which lead to poor throughput in a comparison I am doing between Storm, Spark and Flink.

I have a setup that simulates a progressively worse straggling task that Storm and Spark cope with the relatively well. Flink not so much. Code can be found here - https://github.com/owenrh/flink-variance.

See this throughput chart for the an idea of how badly - https://owenrh.me.uk/assets/images/blog/smackdown/flink-constant-straggler.png

I do not have any production experience with Flink, but I have had a look at the Flink docs and there is nothing in there that jumps out at me to explain or address this. I presume I am missing something, as I cannot believe Flink is this weak in the face of stragglers. It must be configuration right?

Would appreciate any help on this. I've got a draft blog post that I will publish in a day or two, and don't want to criticise the Flink backpressure implementation for what seems most likely some default configuration issue.

Thanks in advance, Owen

--
Owen Rees-Hayward
07912 876046
twitter.com/owen4d



--
Owen Rees-Hayward
07912 876046
twitter.com/owen4d



--
Owen Rees-Hayward
07912 876046
twitter.com/owen4d