Flink auto-scaling feature and documentation suggestions

classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink auto-scaling feature and documentation suggestions

vishalovercome
This post was updated on .
Thank you very much for the new release that makes auto-scaling possible. I'm
currently running multiple flink jobs and I've hand tuned the parallelism of
each of the operators to achieve the best throughput. I would much rather
use the auto-scaling capabilities of flink than have to hand tune my jobs
but it seems there are a few gaps:

1. Setting max parallelism seems to be the only user controlled knob at the
moment. As flink tries to achieve operator chaining by launching same number
of sub-tasks for each operator, I'm afraid the current auto-scaling will be
very inefficient. At a minimum, we need to support user provided ratios that
will be used to distribute sub-tasks among operators. E.g. O1:O2 = 4:1 will
mean that 4 sub-tasks of O1 should be started for each sub-task of O2.

2. Allow for external system to set parallelism of operators. Perhaps job
manager's rest api can be extended to support such scaling requests

3. The doc says that local recovery doesn't work. This makes sense when a
restart is due to a scaling action but I couldn't quite understand why that
needs to be the case when a task manager is recovering from a crash

4. Is there any metric that allows us to distinguish between restart due to
scaling as opposed to restart due to some other reason? Based on the section
on limitations, there isn't but it would be good to add this as people will
eventually want to monitor and alert based on restarts due to failures
alone.

5. Suppose the number of containers are fixed and the job is running. Will
flink internally rebalance by adding sub-tasks of one operator and removing
sub-tasks of another? This could be driven by back-pressure for instance.
The doc doesn't mention this so I'm assuming that current scaling is
designed to maximize operator chaining. However, it does make sense to
incorporate back-pressure to rebalance. Should this be how future versions
of auto-scaling will work then we'll need to have some toggles to avoid
restart loops.

6. How is the implementation different from taking a savepoint and manually
rescaling? Are there any operator specific gotchas that we should watch out
for? For instance, we use AsyncIO operator and wanted to know how inflight
requests to a database would be handled when it's parallelism changes.

7. If maxParallelism needs to be set to control parallelism, then wouldn't that mean that we wouldn't ever be able to take a savepoint and rescale beyond the configured maxParallelism? This would mean that we can never achieve hand tuned resource efficient. I will need to set maxParallelism beyond the current parallelism and given current tendency to allocate same number of sub-tasks for each operator, I will inevitably end up with several under utilized operators.

Once again, thank you for your continued support!
Reply | Threaded
Open this post in threaded view
|

Re: Flink auto-scaling feature and documentation suggestions

vishalovercome
This post was updated on .
Some questions about adaptive scheduling documentation - "If new slots become available the job will be scaled up again, up to the configured parallelism".

1. Does parallelism refer to maxParallelism or parallelism? I'm guessing its the latter because the doc later mentions - "In Reactive Mode (see above) the configured parallelism is ignored and treated as if it was set to infinity, letting the job always use as many resources as possible."

2. The other question I have is along the same lines as that mentioned earlier - what strategy is used to allocate sub-tasks? Is it the ratio of parallelism that's configured or does it try to achieve as much operator chaining as possible

3. If we enable adaptive scheduling with reactive mode and specify both parallelism and max parallelism, it seems that operators will not be scaled beyond the configured parallelism even if slots are available. In this case, wouldn't the adaptive scheduling only be useful for downscaling jobs (and eventually restoring back)?

--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink auto-scaling feature and documentation suggestions

vishalovercome
In reply to this post by vishalovercome
Forgot to add one more question - 7. If maxParallelism needs to be set to
control parallelism, then wouldn't that mean that we wouldn't ever be able
to take a savepoint and rescale beyond the configured maxParallelism? This
would mean that we can never achieve hand tuned resource efficient. I will
need to set maxParallelism beyond the current parallelism and given current
tendency to allocate same number of sub-tasks for each operator, I will
inevitably end up with several under utilized operators.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink auto-scaling feature and documentation suggestions

David Anderson-4
Could you describe a situation in which hand-tuning the parallelism of individual operators produces significantly better throughput than the default approach? I think it would help this discussion if we could have a specific use case in mind where this is clearly better.

Regards,
David

On Tue, May 4, 2021 at 12:29 PM vishalovercome <[hidden email]> wrote:
Forgot to add one more question - 7. If maxParallelism needs to be set to
control parallelism, then wouldn't that mean that we wouldn't ever be able
to take a savepoint and rescale beyond the configured maxParallelism? This
would mean that we can never achieve hand tuned resource efficient. I will
need to set maxParallelism beyond the current parallelism and given current
tendency to allocate same number of sub-tasks for each operator, I will
inevitably end up with several under utilized operators.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink auto-scaling feature and documentation suggestions

vishalovercome
In one of my jobs, windowing is the costliest operation while upstream and
downstream operators are not as resource intensive. There's another operator
in this job that communicates with internal services. This has high
parallelism as well but not as much as that of the windowing operation.
Running all operators with the same parallelism as the windowing operation
would choke some of our internal services we'll be consuming from our source
at a rate much higher than what our internal services can handle. Thus our
sources, sinks, validation, monitoring related operators have very low
parallelism while one has high parallelism and another has even higher
parallelism.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink auto-scaling feature and documentation suggestions

David Anderson-3
Interesting. So if I understand correctly, basically you limited the parallelism of the sources in order to avoid running the job with constant backpressure, and then scaled up the windows to maximize throughput. 

On Tue, May 4, 2021 at 11:23 PM vishalovercome <[hidden email]> wrote:
In one of my jobs, windowing is the costliest operation while upstream and
downstream operators are not as resource intensive. There's another operator
in this job that communicates with internal services. This has high
parallelism as well but not as much as that of the windowing operation.
Running all operators with the same parallelism as the windowing operation
would choke some of our internal services we'll be consuming from our source
at a rate much higher than what our internal services can handle. Thus our
sources, sinks, validation, monitoring related operators have very low
parallelism while one has high parallelism and another has even higher
parallelism.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink auto-scaling feature and documentation suggestions

vishalovercome
Yes. While back-pressure would eventually ensure high throughput, hand tuning
parallelism became necessary because the job with high source parallelism
would immediately bring down our internal services - not giving enough time
to flink to adjust the in-rate. Plus running all operators at such a high
scale would result in wastage of resources, even with operator chaining in
place.

That's why I think more toggles are needed to make current auto-scaling
truly shine.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink auto-scaling feature and documentation suggestions

David Anderson-4
Well, I was thinking you could have avoided overwhelming your internal services by using something like Flink's async i/o operator, tuned to limit the total number of concurrent requests. That way the pipeline could have uniform parallelism without overwhelming those services, and then you'd rely on backpressure to throttle the sources. I'm not saying that would be better -- it's arguably worse to have constant backpressure.

But this point I don't understand:

> running all operators at such a high scale would result in wastage of resources, even with operator chaining in place.

Don't you have the same number of slots, each with the same resources, either way? Plus, you have to do more ser/de, and more networking?

On Wed, May 5, 2021 at 6:08 PM vishalovercome <[hidden email]> wrote:
Yes. While back-pressure would eventually ensure high throughput, hand tuning
parallelism became necessary because the job with high source parallelism
would immediately bring down our internal services - not giving enough time
to flink to adjust the in-rate. Plus running all operators at such a high
scale would result in wastage of resources, even with operator chaining in
place.

That's why I think more toggles are needed to make current auto-scaling
truly shine.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink auto-scaling feature and documentation suggestions

Ken Krugler
In reply to this post by vishalovercome
Hi Vishal,

WRT “bring down our internal services” - a common pattern with making requests to external services is to measure latency, and throttle (delay) requests in response to increased latency.

You’ll see this discussed frequently on web crawling forums as an auto-tuning approach.

Typically there’s a steady increase in latency as load on the service increases.

The trick is throttling soon enough before you hit the “elbow” where a service effectively falls over.

— Ken



On May 5, 2021, at 9:08 AM, vishalovercome <[hidden email]> wrote:

Yes. While back-pressure would eventually ensure high throughput, hand tuning
parallelism became necessary because the job with high source parallelism
would immediately bring down our internal services - not giving enough time
to flink to adjust the in-rate. Plus running all operators at such a high
scale would result in wastage of resources, even with operator chaining in
place.

That's why I think more toggles are needed to make current auto-scaling
truly shine.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

--------------------------
Ken Krugler
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch



Reply | Threaded
Open this post in threaded view
|

Re: Flink auto-scaling feature and documentation suggestions

Till Rohrmann
Hi Vishal,

thanks a lot for all your feedback on the new reactive mode. I'll try to answer your questions.

0. In order to avoid confusion let me quickly explain a bit of terminology: The reactive mode is the new feature that allows Flink to react to newly available resources and to make use of them. In order to achieve this, it uses the newly introduce AdaptiveScheduler which works by declaring the required resources and adapting the job if it loses slots or receives slots if the job is running below its configured parallelism. The AdaptiveScheduler can also be used w/o the reactive mode which would give you the capability that Flink would be able to continue executing your job if your cluster loses TaskManagers (with the default scheduler, the job would then fail with not enough resources). The difference between the reactive and non-reactive mode is that in the reactive mode, Flink ignores the configured parallelism value and tries to run the job at the maxParallelism (so to say the maximum possible parallelism your job can be run with).

1. The AdaptiveScheduler and thereby also the reactive mode uses a simple slot distribution mechanism where every slot sharing group gets the same number of slots. The parallelism of an operator in this slot sharing group is then the minimum of the number of slots and the configured parallelism (when using the reactive mode it would be the configured maxParallelism). This is of course not ideal and can lead to unused slots. Moreover, it does not support scaling different slot sharing groups independently. This is a limitation of the current implementation.

2. With the reactive mode, external systems can control the parallelism with which a job is running by starting and stopping TaskManagers. Admittedly, this does not give you super fine-grained control over the running operators. Being able to specify ratios for operators could be a good extension.

3. Local recovery simply has not been implemented because of scoping reasons. There is nothing fundamentally preventing this from working, it just hasn't been implemented yet.

4. No, there are currently no separate metrics for restarts and rescaling operations. I do see the usefulness of such a metric. However, for the reactive mode where scaling down means that we kill a TaskManager, I am not entirely sure how we will be able to distinguish this from any other reason which can kill a TaskManager. The only way I could see this work is by telling Flink about the killing of a TaskManager.

5. No, Flink is not able to do this kind of operator-based optimizations at the moment. I think this could be possible once we have implemented the next step of the reactive mode which is proper auto-scaling.

6. From a high-level perspective, there is not much difference between the reactive mode and manually taking a savepoint and resuming the job from it, and changing the parallelism. That's effectively also what Flink does internally. The difference is that this operation is now automated and that Flink can handle also situations where you don't get all the resources after a restart where the manual approach would simply fail.

7. Setting the maxParallelism is only required for the reactive mode and if you don't want to run an operator with the default maxParallelism value. Per definition, the maxParallelism defines the maximum parallelism you can run your operator with. Hence, if you set this value to something, then you should be sure that you don't have to run your job with higher parallelism than that. Note, that the reactive mode will try to run the operator with this parallelism. However, if it has fewer resources, then it will run the operators at lower parallelism. So the maxParallelism defines the upper bound for the parallelism of your operator.

The reactive mode's intention is the first step towards more elastic streaming pipelines and simplified operations. We see it as the foundation for more advanced features such as true auto-scaling where each operator can decide its parallelism. I hope this helps to understand the reactive mode a bit better.

Cheers,
Till

On Wed, May 5, 2021 at 7:50 PM Ken Krugler <[hidden email]> wrote:
Hi Vishal,

WRT “bring down our internal services” - a common pattern with making requests to external services is to measure latency, and throttle (delay) requests in response to increased latency.

You’ll see this discussed frequently on web crawling forums as an auto-tuning approach.

Typically there’s a steady increase in latency as load on the service increases.

The trick is throttling soon enough before you hit the “elbow” where a service effectively falls over.

— Ken



On May 5, 2021, at 9:08 AM, vishalovercome <[hidden email]> wrote:

Yes. While back-pressure would eventually ensure high throughput, hand tuning
parallelism became necessary because the job with high source parallelism
would immediately bring down our internal services - not giving enough time
to flink to adjust the in-rate. Plus running all operators at such a high
scale would result in wastage of resources, even with operator chaining in
place.

That's why I think more toggles are needed to make current auto-scaling
truly shine.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

--------------------------
Ken Krugler
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch



Reply | Threaded
Open this post in threaded view
|

Re: Flink auto-scaling feature and documentation suggestions

vishalovercome
In reply to this post by David Anderson-4
I am using the async IO operator. The problem is that increasing source
parallelism from 1 to 2 was enough to tip our systems over the edge.
Reducing the parallelism of async IO operator to 2 is not an option as that
would reduce the throughput quite a bit. This means that no matter what we
do, we'll end up with different operators with different parallelism.

What I meant with: "running all operators at such a high scale would result
in wastage of resources, even with operator chaining in place." was that
creating as many subtasks as that of the windowing operator for each of my
operators would lead to sub-optimal performance. While chaining would ensure
that all tasks would run in one slot, the partitioning of data would result
in the same network IO as chaining doesn't guarantee that the same tuple is
processed in 1 slot.

In my experience, running operators with same parallelism of each operator
is always inferior compared to hand tuned parallelism.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink auto-scaling feature and documentation suggestions

vishalovercome
In reply to this post by Ken Krugler
We do exactly what you mentioned. However, it's not that simple
unfortunately. Our services don't have a predictable performance as traffic
varies a lot during the day.

As I've explained above increase source parallelism to 2 was enough to tip
over our services and reducing parallelism of the async operator to 2 is not
an option.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink auto-scaling feature and documentation suggestions

vishalovercome
In reply to this post by Till Rohrmann
Thank you for answering all my questions. My suggestion would be to start off
with exposing an API to allow dynamically changing operator parallelism as
the users of flink will be better able to decide the right scaling policy.
Once this functionality is there, its just a matter of providing policies
(ratio based, throughput based, back-pressure based). The web UI could be
used for setting parallelism as well.

An analogy would be autoscaling provided by cloud providers. The features
provided are:
1. Web UI for manually overriding parallelism (min, max, desired)
2. Metric based scaling policies

It will be difficult for developers to think of a reasonable value for
maxParallelism for each operator and like I explained above, sometimes even
a small increase in parallelism is enough to bring things down. A UI /
external policy based approach will allow for quick experimentation and fine
tuning. I don't think it will be possible for flink developers to build one
size fits all solution.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink auto-scaling feature and documentation suggestions

Till Rohrmann
Yes, exposing an API to adjust the parallelism of individual operators is definitely a good step towards the auto-scaling feature which we will consider. The missing piece is persisting this information so that in case of recovery you don't recover with a completely different parallelism.

I also agree that it will be very hard for Flink to decide on the best parallelism in general. For that to do you usually need to know a bit about the application logic. Hence, outsourcing this problem to the user who can do better decisions is a very good idea.

The community will keep improving this feature so that with next releases it should become more powerful.

Cheers,
Till

On Thu, May 6, 2021 at 2:38 PM vishalovercome <[hidden email]> wrote:
Thank you for answering all my questions. My suggestion would be to start off
with exposing an API to allow dynamically changing operator parallelism as
the users of flink will be better able to decide the right scaling policy.
Once this functionality is there, its just a matter of providing policies
(ratio based, throughput based, back-pressure based). The web UI could be
used for setting parallelism as well.

An analogy would be autoscaling provided by cloud providers. The features
provided are:
1. Web UI for manually overriding parallelism (min, max, desired)
2. Metric based scaling policies

It will be difficult for developers to think of a reasonable value for
maxParallelism for each operator and like I explained above, sometimes even
a small increase in parallelism is enough to bring things down. A UI /
external policy based approach will allow for quick experimentation and fine
tuning. I don't think it will be possible for flink developers to build one
size fits all solution.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/