Dynamically allocating right-sized task resources

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Dynamically allocating right-sized task resources

Chad Dombrova
Hi all,
First time poster, so go easy on me :)

What is Flink's story for accommodating task workloads with vastly disparate resource requirements: e.g. some require very little CPU and RAM, while others require quite a lot.  

Our current strategy is to bundle resource-intensive tasks and send them to a different batch-execution framework.  For this we use AWS | Thinkbox Deadline [1].  Deadline's scheduler supports numerous strategies for paring work with a right-sized worker -- criteria (numeric metadata like min/max RAM and CPU requirements) and pools (basically named resource tags) -- as well as when to schedule tasks -- priorities and limits (a check-in/check-out system for finite resources, like a software license).  Are others using a similar strategy, or are you provisioning your task managers for the worst case scenario?

Outsourcing to a separate batch framework for resource intensive tasks complicates the design of our pipeline and bifurcates our resource pool, so I'd rather use Flink for the whole process. I searched around and found two Jira tickets which could form the foundations of a solution to this problem:

- https://issues.apache.org/jira/browse/FLINK-9953:  Active Kubernetes integration
https://issues.apache.org/jira/browse/FLINK-10240: Pluggable scheduling strategy for batch jobs

Sadly, the latter seems to be stalled.

I read the design doc [2] for the active K8s integration, and this statement seemed crucial:

> If integrated with k8s natively, Flink’s resource manager will call kubernetes API to allocate/release new pods and adjust the resource usage on demand, and then support scale-up and scale down.

This is particularly powerful when your k8s cluster is itself backed by auto-scaling of nodes (as with GKE autoscaler [3]), but it's unclear from the doc *when and how* resources are adjusted based on demand.  Will it simply scale up a shared pool of resource-identical task managers based on the size of the task backlog (or some other metric that determines "falling behind"), or does a task have a way of specifying and acquiring an execution resource that meets its specific performance profile?  

Based on the docs for the YARN resource manager [4], it acquires a pool of task managers with identical specs, so if this model is also used for the K8s resource manager, task managers would continue to be provisioned for the worst-case scenario (particularly in terms of RAM per process), which for us would mean they are drastically over-provisioned for common tasks. 

I'm new to Flink, so there's a good chance I've overlooked something important, so I'm looking forward to learning more!

-thanks
chad






Reply | Threaded
Open this post in threaded view
|

Re: Dynamically allocating right-sized task resources

Xintong Song
Hi Chad,

If I understand correctly, the scenarios you talked about are running batch jobs, right?

At the moment (Flink 1.8 and earlier), Flink does not differentiate different working load of tasks. It uses a slot-sharing approach[1] to balance workloads among workers. The general idea is to put tasks with different workload into same slot and spread tasks with similar workload to different ones, resulting in slots with similar total workload. The approach works fine for streaming jobs, where all the tasks are running at the same time. However, it might not work that well for batch jobs, were tasks are scheduled stage by stage.

You can also refers to resource management strategy in the blink branch. Blink was the internal version Flink in Alibaba, which is open sourced early this year. It customizes task manager resources (on yarn) according to tasks' resource requirements. The community and Alibaba are currently in progress of working together to bring good features of Blink into Flink master. One of those is fine grained resource management, which could help resolve resource management and load balancing issues for both streaming and batch jobs.


Thank you~

Xintong Song



On Sun, Aug 4, 2019 at 9:40 PM Chad Dombrova <[hidden email]> wrote:
Hi all,
First time poster, so go easy on me :)

What is Flink's story for accommodating task workloads with vastly disparate resource requirements: e.g. some require very little CPU and RAM, while others require quite a lot.  

Our current strategy is to bundle resource-intensive tasks and send them to a different batch-execution framework.  For this we use AWS | Thinkbox Deadline [1].  Deadline's scheduler supports numerous strategies for paring work with a right-sized worker -- criteria (numeric metadata like min/max RAM and CPU requirements) and pools (basically named resource tags) -- as well as when to schedule tasks -- priorities and limits (a check-in/check-out system for finite resources, like a software license).  Are others using a similar strategy, or are you provisioning your task managers for the worst case scenario?

Outsourcing to a separate batch framework for resource intensive tasks complicates the design of our pipeline and bifurcates our resource pool, so I'd rather use Flink for the whole process. I searched around and found two Jira tickets which could form the foundations of a solution to this problem:

- https://issues.apache.org/jira/browse/FLINK-9953:  Active Kubernetes integration
https://issues.apache.org/jira/browse/FLINK-10240: Pluggable scheduling strategy for batch jobs

Sadly, the latter seems to be stalled.

I read the design doc [2] for the active K8s integration, and this statement seemed crucial:

> If integrated with k8s natively, Flink’s resource manager will call kubernetes API to allocate/release new pods and adjust the resource usage on demand, and then support scale-up and scale down.

This is particularly powerful when your k8s cluster is itself backed by auto-scaling of nodes (as with GKE autoscaler [3]), but it's unclear from the doc *when and how* resources are adjusted based on demand.  Will it simply scale up a shared pool of resource-identical task managers based on the size of the task backlog (or some other metric that determines "falling behind"), or does a task have a way of specifying and acquiring an execution resource that meets its specific performance profile?  

Based on the docs for the YARN resource manager [4], it acquires a pool of task managers with identical specs, so if this model is also used for the K8s resource manager, task managers would continue to be provisioned for the worst-case scenario (particularly in terms of RAM per process), which for us would mean they are drastically over-provisioned for common tasks. 

I'm new to Flink, so there's a good chance I've overlooked something important, so I'm looking forward to learning more!

-thanks
chad






Reply | Threaded
Open this post in threaded view
|

Re: Dynamically allocating right-sized task resources

Yang Wang
Hi Chad,

Just as Xintong said, fine grained resource management has not been introduced to flink. And i think it is the elegant solution for your scenario. Task managers with different resource specification will be allocated and started by Yarn/k8s resource manager according to your operator resource request. So your resource-intensive tasks will be deployed to the task manager with more resources.

>> If integrated with k8s natively, Flink’s resource manager will call kubernetes API to allocate/release new pods and adjust the resource usage on demand, and then support scale-up and scale down.

We already have an internal implementation of FLINK-9953 and is trying to merge it to flink. However, the k8s resource manager will do the same as Yarn, allocate the task manager with same resource specification. Of course, it could be powerful after the fine grained resource management has been introduced to flink. You could specify the resource for every operator to meet its performance.

BTW, FLINK-9953 does not contains the ability of auto scale. It just allocates the resource according to the flink job and will not automatically scale up/down based on the back logs or metrics. The auto scale of flink cluster is a more general topic and should be designed independently without resource management system(Yarn/k8s/mesos).

Xintong Song <[hidden email]> 于2019年8月5日周一 下午11:24写道:
Hi Chad,

If I understand correctly, the scenarios you talked about are running batch jobs, right?

At the moment (Flink 1.8 and earlier), Flink does not differentiate different working load of tasks. It uses a slot-sharing approach[1] to balance workloads among workers. The general idea is to put tasks with different workload into same slot and spread tasks with similar workload to different ones, resulting in slots with similar total workload. The approach works fine for streaming jobs, where all the tasks are running at the same time. However, it might not work that well for batch jobs, were tasks are scheduled stage by stage.

You can also refers to resource management strategy in the blink branch. Blink was the internal version Flink in Alibaba, which is open sourced early this year. It customizes task manager resources (on yarn) according to tasks' resource requirements. The community and Alibaba are currently in progress of working together to bring good features of Blink into Flink master. One of those is fine grained resource management, which could help resolve resource management and load balancing issues for both streaming and batch jobs.


Thank you~

Xintong Song



On Sun, Aug 4, 2019 at 9:40 PM Chad Dombrova <[hidden email]> wrote:
Hi all,
First time poster, so go easy on me :)

What is Flink's story for accommodating task workloads with vastly disparate resource requirements: e.g. some require very little CPU and RAM, while others require quite a lot.  

Our current strategy is to bundle resource-intensive tasks and send them to a different batch-execution framework.  For this we use AWS | Thinkbox Deadline [1].  Deadline's scheduler supports numerous strategies for paring work with a right-sized worker -- criteria (numeric metadata like min/max RAM and CPU requirements) and pools (basically named resource tags) -- as well as when to schedule tasks -- priorities and limits (a check-in/check-out system for finite resources, like a software license).  Are others using a similar strategy, or are you provisioning your task managers for the worst case scenario?

Outsourcing to a separate batch framework for resource intensive tasks complicates the design of our pipeline and bifurcates our resource pool, so I'd rather use Flink for the whole process. I searched around and found two Jira tickets which could form the foundations of a solution to this problem:

- https://issues.apache.org/jira/browse/FLINK-9953:  Active Kubernetes integration
https://issues.apache.org/jira/browse/FLINK-10240: Pluggable scheduling strategy for batch jobs

Sadly, the latter seems to be stalled.

I read the design doc [2] for the active K8s integration, and this statement seemed crucial:

> If integrated with k8s natively, Flink’s resource manager will call kubernetes API to allocate/release new pods and adjust the resource usage on demand, and then support scale-up and scale down.

This is particularly powerful when your k8s cluster is itself backed by auto-scaling of nodes (as with GKE autoscaler [3]), but it's unclear from the doc *when and how* resources are adjusted based on demand.  Will it simply scale up a shared pool of resource-identical task managers based on the size of the task backlog (or some other metric that determines "falling behind"), or does a task have a way of specifying and acquiring an execution resource that meets its specific performance profile?  

Based on the docs for the YARN resource manager [4], it acquires a pool of task managers with identical specs, so if this model is also used for the K8s resource manager, task managers would continue to be provisioned for the worst-case scenario (particularly in terms of RAM per process), which for us would mean they are drastically over-provisioned for common tasks. 

I'm new to Flink, so there's a good chance I've overlooked something important, so I'm looking forward to learning more!

-thanks
chad






Reply | Threaded
Open this post in threaded view
|

Re: Dynamically allocating right-sized task resources

Chad Dombrova
Thanks for the great feedback.

Just as Xintong said, fine grained resource management has not been introduced to flink. And i think it is the elegant solution for your scenario. Task managers with different resource specification will be allocated and started by Yarn/k8s resource manager according to your operator resource request. So your resource-intensive tasks will be deployed to the task manager with more resources.

Is this feature theoretical or is it on the Blink branch already?  Is there a Jira ticket for this?

>> If integrated with k8s natively, Flink’s resource manager will call kubernetes API to allocate/release new pods and adjust the resource usage on demand, and then support scale-up and scale down.

BTW, FLINK-9953 does not contains the ability of auto scale. It just allocates the resource according to the flink job and will not automatically scale up/down based on the back logs or metrics. The auto scale of flink cluster is a more general topic and should be designed independently without resource management system(Yarn/k8s/mesos).

Ok, so if I'm understanding you correctly, there are two opportunities to adjust/scale resources:  
1) at the time that the job is assigned to a task manager (currently supported by Yarn/Mesos. k8s case covered by FLINK-9953).  
2) after a job has already been running on a task manager (not yet supported by any resource manager due to lack of support within Flink in general)

And IIUC "fine grained resource management" is an improvement on top both of these, which allows Flink to use heterogenous task manager resources, each customized based on specifications of the operator.

I'm still unclear what is meant by "It just allocates the resource according to the flink job".  What information does the job provide that is currently taken into consideration when allocating resources?  IIUC, the answer is in terms of number of slots, since each slot will have the same resource profile.

I feel like the docs do not cover the Resource Manager story very well.  It's not mentioned at all in the page on Distributed Runtime Environment [1] or Jobs and Scheduling [2]. 

thanks,
chad