Flink AutoScaling EMR

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink AutoScaling EMR

Rex Fenley
Hello,

I'm trying to find a solution for auto scaling our Flink EMR cluster with 0 downtime using RocksDB as state storage and S3 backing store.

My current thoughts are like so:
* Scaling an Operator dynamically would require all keyed state to be available to the set of subtasks for that operator, therefore a set of subtasks must be reading to and writing from the same RocksDB. I.e. to scale in and out subtasks in that set, they need to read from the same Rocks.
* Since subtasks can run on different core nodes, is it possible to have different core nodes read/write to the same RocksDB?
* When's the safe point to scale in and out an operator? Only right after a checkpoint possibly?

If the above is not possible then we'll have to use save points which means some downtime, therefore:
* Scaling out during high traffic is arguably more important to react quickly to than scaling in during low traffic. Is it possible to add more core nodes to EMR without disturbing a job? If so then maybe we can orchestrate running a new job on new nodes and then loading a savepoint from a currently running job.

Lastly
* Save Points for ~70Gib of data take on the order of minutes to tens of minutes for us to restore from, is there any way to speed up restoration?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Flink AutoScaling EMR

Rex Fenley
Another thought, would it be possible to
* Spin up new core or task nodes.
* Run a new copy of the same job on these new nodes from a savepoint.
* Have the new job not write to the sink until the other job is torn down?

This would allow us to be eventually consistent and maintain writes going through without downtime. As long as whatever is buffering the sink doesn't run out of space it should work just fine.

We're hoping to achieve consistency in less than 30s ideally.

Again though, if we could get savepoints to restore in less than 30s that would likely be sufficient for our purposes.

Thanks!

On Wed, Nov 11, 2020 at 11:42 AM Rex Fenley <[hidden email]> wrote:
Hello,

I'm trying to find a solution for auto scaling our Flink EMR cluster with 0 downtime using RocksDB as state storage and S3 backing store.

My current thoughts are like so:
* Scaling an Operator dynamically would require all keyed state to be available to the set of subtasks for that operator, therefore a set of subtasks must be reading to and writing from the same RocksDB. I.e. to scale in and out subtasks in that set, they need to read from the same Rocks.
* Since subtasks can run on different core nodes, is it possible to have different core nodes read/write to the same RocksDB?
* When's the safe point to scale in and out an operator? Only right after a checkpoint possibly?

If the above is not possible then we'll have to use save points which means some downtime, therefore:
* Scaling out during high traffic is arguably more important to react quickly to than scaling in during low traffic. Is it possible to add more core nodes to EMR without disturbing a job? If so then maybe we can orchestrate running a new job on new nodes and then loading a savepoint from a currently running job.

Lastly
* Save Points for ~70Gib of data take on the order of minutes to tens of minutes for us to restore from, is there any way to speed up restoration?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Flink AutoScaling EMR

rmetzger0
Hey Rex,

the second approach (spinning up a standby job and then doing a handover) sounds more promising to implement, without rewriting half of the Flink codebase ;)
What you need is a tool that orchestrates creating a savepoint, starting a second job from the savepoint and then communicating with a custom sink implementation that can be switched on/off in the two jobs.
With that approach, you should have almost no downtime, just increased resource requirements during such a handover.

What you need to consider as well is that this handover process only works for scheduled maintenance. If you have a system failure, you'll have downtime until the last checkpoint is restored.
If you are trying to reduce the potential downtime overall, I would rather recommend optimizing the checkpoint restore time, as this can cover both scheduled maintenance and system failures.

Best,
Robert





On Wed, Nov 11, 2020 at 8:56 PM Rex Fenley <[hidden email]> wrote:
Another thought, would it be possible to
* Spin up new core or task nodes.
* Run a new copy of the same job on these new nodes from a savepoint.
* Have the new job not write to the sink until the other job is torn down?

This would allow us to be eventually consistent and maintain writes going through without downtime. As long as whatever is buffering the sink doesn't run out of space it should work just fine.

We're hoping to achieve consistency in less than 30s ideally.

Again though, if we could get savepoints to restore in less than 30s that would likely be sufficient for our purposes.

Thanks!

On Wed, Nov 11, 2020 at 11:42 AM Rex Fenley <[hidden email]> wrote:
Hello,

I'm trying to find a solution for auto scaling our Flink EMR cluster with 0 downtime using RocksDB as state storage and S3 backing store.

My current thoughts are like so:
* Scaling an Operator dynamically would require all keyed state to be available to the set of subtasks for that operator, therefore a set of subtasks must be reading to and writing from the same RocksDB. I.e. to scale in and out subtasks in that set, they need to read from the same Rocks.
* Since subtasks can run on different core nodes, is it possible to have different core nodes read/write to the same RocksDB?
* When's the safe point to scale in and out an operator? Only right after a checkpoint possibly?

If the above is not possible then we'll have to use save points which means some downtime, therefore:
* Scaling out during high traffic is arguably more important to react quickly to than scaling in during low traffic. Is it possible to add more core nodes to EMR without disturbing a job? If so then maybe we can orchestrate running a new job on new nodes and then loading a savepoint from a currently running job.

Lastly
* Save Points for ~70Gib of data take on the order of minutes to tens of minutes for us to restore from, is there any way to speed up restoration?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Flink AutoScaling EMR

Rex Fenley
Awesome, thanks! Is there a way to make the new yarn job only on the new hardware? Or would the two jobs have to run on intersecting hardware and then would be switched on/off, which means we'll need a buffer of resources for our orchestration?

Also, good point on recovery. I'll spend some time looking into this.

Thanks


On Wed, Nov 11, 2020 at 11:53 PM Robert Metzger <[hidden email]> wrote:
Hey Rex,

the second approach (spinning up a standby job and then doing a handover) sounds more promising to implement, without rewriting half of the Flink codebase ;)
What you need is a tool that orchestrates creating a savepoint, starting a second job from the savepoint and then communicating with a custom sink implementation that can be switched on/off in the two jobs.
With that approach, you should have almost no downtime, just increased resource requirements during such a handover.

What you need to consider as well is that this handover process only works for scheduled maintenance. If you have a system failure, you'll have downtime until the last checkpoint is restored.
If you are trying to reduce the potential downtime overall, I would rather recommend optimizing the checkpoint restore time, as this can cover both scheduled maintenance and system failures.

Best,
Robert





On Wed, Nov 11, 2020 at 8:56 PM Rex Fenley <[hidden email]> wrote:
Another thought, would it be possible to
* Spin up new core or task nodes.
* Run a new copy of the same job on these new nodes from a savepoint.
* Have the new job not write to the sink until the other job is torn down?

This would allow us to be eventually consistent and maintain writes going through without downtime. As long as whatever is buffering the sink doesn't run out of space it should work just fine.

We're hoping to achieve consistency in less than 30s ideally.

Again though, if we could get savepoints to restore in less than 30s that would likely be sufficient for our purposes.

Thanks!

On Wed, Nov 11, 2020 at 11:42 AM Rex Fenley <[hidden email]> wrote:
Hello,

I'm trying to find a solution for auto scaling our Flink EMR cluster with 0 downtime using RocksDB as state storage and S3 backing store.

My current thoughts are like so:
* Scaling an Operator dynamically would require all keyed state to be available to the set of subtasks for that operator, therefore a set of subtasks must be reading to and writing from the same RocksDB. I.e. to scale in and out subtasks in that set, they need to read from the same Rocks.
* Since subtasks can run on different core nodes, is it possible to have different core nodes read/write to the same RocksDB?
* When's the safe point to scale in and out an operator? Only right after a checkpoint possibly?

If the above is not possible then we'll have to use save points which means some downtime, therefore:
* Scaling out during high traffic is arguably more important to react quickly to than scaling in during low traffic. Is it possible to add more core nodes to EMR without disturbing a job? If so then maybe we can orchestrate running a new job on new nodes and then loading a savepoint from a currently running job.

Lastly
* Save Points for ~70Gib of data take on the order of minutes to tens of minutes for us to restore from, is there any way to speed up restoration?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Flink AutoScaling EMR

rmetzger0
Hi,
it seems that YARN has a feature for targeting specific hardware: https://hadoop.apache.org/docs/r3.1.0/hadoop-yarn/hadoop-yarn-site/PlacementConstraints.html
In any case, you'll need enough spare resources for some time to be able to run your job twice for this kind of "zero downtime handover"

On Thu, Nov 12, 2020 at 6:10 PM Rex Fenley <[hidden email]> wrote:
Awesome, thanks! Is there a way to make the new yarn job only on the new hardware? Or would the two jobs have to run on intersecting hardware and then would be switched on/off, which means we'll need a buffer of resources for our orchestration?

Also, good point on recovery. I'll spend some time looking into this.

Thanks


On Wed, Nov 11, 2020 at 11:53 PM Robert Metzger <[hidden email]> wrote:
Hey Rex,

the second approach (spinning up a standby job and then doing a handover) sounds more promising to implement, without rewriting half of the Flink codebase ;)
What you need is a tool that orchestrates creating a savepoint, starting a second job from the savepoint and then communicating with a custom sink implementation that can be switched on/off in the two jobs.
With that approach, you should have almost no downtime, just increased resource requirements during such a handover.

What you need to consider as well is that this handover process only works for scheduled maintenance. If you have a system failure, you'll have downtime until the last checkpoint is restored.
If you are trying to reduce the potential downtime overall, I would rather recommend optimizing the checkpoint restore time, as this can cover both scheduled maintenance and system failures.

Best,
Robert





On Wed, Nov 11, 2020 at 8:56 PM Rex Fenley <[hidden email]> wrote:
Another thought, would it be possible to
* Spin up new core or task nodes.
* Run a new copy of the same job on these new nodes from a savepoint.
* Have the new job not write to the sink until the other job is torn down?

This would allow us to be eventually consistent and maintain writes going through without downtime. As long as whatever is buffering the sink doesn't run out of space it should work just fine.

We're hoping to achieve consistency in less than 30s ideally.

Again though, if we could get savepoints to restore in less than 30s that would likely be sufficient for our purposes.

Thanks!

On Wed, Nov 11, 2020 at 11:42 AM Rex Fenley <[hidden email]> wrote:
Hello,

I'm trying to find a solution for auto scaling our Flink EMR cluster with 0 downtime using RocksDB as state storage and S3 backing store.

My current thoughts are like so:
* Scaling an Operator dynamically would require all keyed state to be available to the set of subtasks for that operator, therefore a set of subtasks must be reading to and writing from the same RocksDB. I.e. to scale in and out subtasks in that set, they need to read from the same Rocks.
* Since subtasks can run on different core nodes, is it possible to have different core nodes read/write to the same RocksDB?
* When's the safe point to scale in and out an operator? Only right after a checkpoint possibly?

If the above is not possible then we'll have to use save points which means some downtime, therefore:
* Scaling out during high traffic is arguably more important to react quickly to than scaling in during low traffic. Is it possible to add more core nodes to EMR without disturbing a job? If so then maybe we can orchestrate running a new job on new nodes and then loading a savepoint from a currently running job.

Lastly
* Save Points for ~70Gib of data take on the order of minutes to tens of minutes for us to restore from, is there any way to speed up restoration?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Flink AutoScaling EMR

Xintong Song
Is there a way to make the new yarn job only on the new hardware?
I think you can simply decommission the nodes from Yarn, so that new containers will not be allocated from those nodes. You might also need a large decommission timeout, upon which all the remaining running contains on the decommissioning node will be killed.

Thank you~

Xintong Song



On Fri, Nov 13, 2020 at 2:57 PM Robert Metzger <[hidden email]> wrote:
Hi,
it seems that YARN has a feature for targeting specific hardware: https://hadoop.apache.org/docs/r3.1.0/hadoop-yarn/hadoop-yarn-site/PlacementConstraints.html
In any case, you'll need enough spare resources for some time to be able to run your job twice for this kind of "zero downtime handover"

On Thu, Nov 12, 2020 at 6:10 PM Rex Fenley <[hidden email]> wrote:
Awesome, thanks! Is there a way to make the new yarn job only on the new hardware? Or would the two jobs have to run on intersecting hardware and then would be switched on/off, which means we'll need a buffer of resources for our orchestration?

Also, good point on recovery. I'll spend some time looking into this.

Thanks


On Wed, Nov 11, 2020 at 11:53 PM Robert Metzger <[hidden email]> wrote:
Hey Rex,

the second approach (spinning up a standby job and then doing a handover) sounds more promising to implement, without rewriting half of the Flink codebase ;)
What you need is a tool that orchestrates creating a savepoint, starting a second job from the savepoint and then communicating with a custom sink implementation that can be switched on/off in the two jobs.
With that approach, you should have almost no downtime, just increased resource requirements during such a handover.

What you need to consider as well is that this handover process only works for scheduled maintenance. If you have a system failure, you'll have downtime until the last checkpoint is restored.
If you are trying to reduce the potential downtime overall, I would rather recommend optimizing the checkpoint restore time, as this can cover both scheduled maintenance and system failures.

Best,
Robert





On Wed, Nov 11, 2020 at 8:56 PM Rex Fenley <[hidden email]> wrote:
Another thought, would it be possible to
* Spin up new core or task nodes.
* Run a new copy of the same job on these new nodes from a savepoint.
* Have the new job not write to the sink until the other job is torn down?

This would allow us to be eventually consistent and maintain writes going through without downtime. As long as whatever is buffering the sink doesn't run out of space it should work just fine.

We're hoping to achieve consistency in less than 30s ideally.

Again though, if we could get savepoints to restore in less than 30s that would likely be sufficient for our purposes.

Thanks!

On Wed, Nov 11, 2020 at 11:42 AM Rex Fenley <[hidden email]> wrote:
Hello,

I'm trying to find a solution for auto scaling our Flink EMR cluster with 0 downtime using RocksDB as state storage and S3 backing store.

My current thoughts are like so:
* Scaling an Operator dynamically would require all keyed state to be available to the set of subtasks for that operator, therefore a set of subtasks must be reading to and writing from the same RocksDB. I.e. to scale in and out subtasks in that set, they need to read from the same Rocks.
* Since subtasks can run on different core nodes, is it possible to have different core nodes read/write to the same RocksDB?
* When's the safe point to scale in and out an operator? Only right after a checkpoint possibly?

If the above is not possible then we'll have to use save points which means some downtime, therefore:
* Scaling out during high traffic is arguably more important to react quickly to than scaling in during low traffic. Is it possible to add more core nodes to EMR without disturbing a job? If so then maybe we can orchestrate running a new job on new nodes and then loading a savepoint from a currently running job.

Lastly
* Save Points for ~70Gib of data take on the order of minutes to tens of minutes for us to restore from, is there any way to speed up restoration?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Flink AutoScaling EMR

Rex Fenley
Thanks for all the input!

On Sun, Nov 15, 2020 at 6:59 PM Xintong Song <[hidden email]> wrote:
Is there a way to make the new yarn job only on the new hardware?
I think you can simply decommission the nodes from Yarn, so that new containers will not be allocated from those nodes. You might also need a large decommission timeout, upon which all the remaining running contains on the decommissioning node will be killed.

Thank you~

Xintong Song



On Fri, Nov 13, 2020 at 2:57 PM Robert Metzger <[hidden email]> wrote:
Hi,
it seems that YARN has a feature for targeting specific hardware: https://hadoop.apache.org/docs/r3.1.0/hadoop-yarn/hadoop-yarn-site/PlacementConstraints.html
In any case, you'll need enough spare resources for some time to be able to run your job twice for this kind of "zero downtime handover"

On Thu, Nov 12, 2020 at 6:10 PM Rex Fenley <[hidden email]> wrote:
Awesome, thanks! Is there a way to make the new yarn job only on the new hardware? Or would the two jobs have to run on intersecting hardware and then would be switched on/off, which means we'll need a buffer of resources for our orchestration?

Also, good point on recovery. I'll spend some time looking into this.

Thanks


On Wed, Nov 11, 2020 at 11:53 PM Robert Metzger <[hidden email]> wrote:
Hey Rex,

the second approach (spinning up a standby job and then doing a handover) sounds more promising to implement, without rewriting half of the Flink codebase ;)
What you need is a tool that orchestrates creating a savepoint, starting a second job from the savepoint and then communicating with a custom sink implementation that can be switched on/off in the two jobs.
With that approach, you should have almost no downtime, just increased resource requirements during such a handover.

What you need to consider as well is that this handover process only works for scheduled maintenance. If you have a system failure, you'll have downtime until the last checkpoint is restored.
If you are trying to reduce the potential downtime overall, I would rather recommend optimizing the checkpoint restore time, as this can cover both scheduled maintenance and system failures.

Best,
Robert





On Wed, Nov 11, 2020 at 8:56 PM Rex Fenley <[hidden email]> wrote:
Another thought, would it be possible to
* Spin up new core or task nodes.
* Run a new copy of the same job on these new nodes from a savepoint.
* Have the new job not write to the sink until the other job is torn down?

This would allow us to be eventually consistent and maintain writes going through without downtime. As long as whatever is buffering the sink doesn't run out of space it should work just fine.

We're hoping to achieve consistency in less than 30s ideally.

Again though, if we could get savepoints to restore in less than 30s that would likely be sufficient for our purposes.

Thanks!

On Wed, Nov 11, 2020 at 11:42 AM Rex Fenley <[hidden email]> wrote:
Hello,

I'm trying to find a solution for auto scaling our Flink EMR cluster with 0 downtime using RocksDB as state storage and S3 backing store.

My current thoughts are like so:
* Scaling an Operator dynamically would require all keyed state to be available to the set of subtasks for that operator, therefore a set of subtasks must be reading to and writing from the same RocksDB. I.e. to scale in and out subtasks in that set, they need to read from the same Rocks.
* Since subtasks can run on different core nodes, is it possible to have different core nodes read/write to the same RocksDB?
* When's the safe point to scale in and out an operator? Only right after a checkpoint possibly?

If the above is not possible then we'll have to use save points which means some downtime, therefore:
* Scaling out during high traffic is arguably more important to react quickly to than scaling in during low traffic. Is it possible to add more core nodes to EMR without disturbing a job? If so then maybe we can orchestrate running a new job on new nodes and then loading a savepoint from a currently running job.

Lastly
* Save Points for ~70Gib of data take on the order of minutes to tens of minutes for us to restore from, is there any way to speed up restoration?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US