thread model issue in TaskManager

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

thread model issue in TaskManager

Zhijiang(wangzhijiang999)

As I know, flink uses thread model in TaskManager, that means one taskmanager process may run many different operator threads,and these threads will compete the memory of the process. I know that flink has memoryManage component in each taskManager, and it will control the localBufferPool of InputGate, ResultPartition for each task,but if UDF consume much memory, it will use jvm heap memory, so it can not be controlled by flink. If I use flink as common platform, some users will consume much memory in UDF, and it may influence other threads in the process, especially for OOM.  I know that it has sharedslot or isolated slot properties , but it just limit the task schedule in one taskmanager, can i schedule task in separate taskmanger if i consume much memory and donot want to influence other tasks. Or are there any suggestions for the issue of thread model. As I know spark is also thread model, but hadoop2 use process model.


Thank you for any suggestions in advance!

Reply | Threaded
Open this post in threaded view
|

Re: thread model issue in TaskManager

Fabian Hueske-2
Hi,

it is currently not possible to isolate tasks that consume a lot of JVM heap memory and schedule them to a specific slot (or TaskManager).
If you operate in a YARN setup, you can isolate different jobs from each other by starting a new YARN session for each job, but tasks within the same job cannot be isolated from each other right now.

Cheers, Fabian

2015-07-30 4:02 GMT+02:00 wangzhijiang999 <[hidden email]>:

As I know, flink uses thread model in TaskManager, that means one taskmanager process may run many different operator threads,and these threads will compete the memory of the process. I know that flink has memoryManage component in each taskManager, and it will control the localBufferPool of InputGate, ResultPartition for each task,but if UDF consume much memory, it will use jvm heap memory, so it can not be controlled by flink. If I use flink as common platform, some users will consume much memory in UDF, and it may influence other threads in the process, especially for OOM.  I know that it has sharedslot or isolated slot properties , but it just limit the task schedule in one taskmanager, can i schedule task in separate taskmanger if i consume much memory and donot want to influence other tasks. Or are there any suggestions for the issue of thread model. As I know spark is also thread model, but hadoop2 use process model.


Thank you for any suggestions in advance!


Reply | Threaded
Open this post in threaded view
|

Re: thread model issue in TaskManager

Stephan Ewen
As Fabian suggested, YARN is a good way to go for isolation (it actually isolates more than a JVM, which is very nice).

Here are some additional things you can do:

  - For isolation between parallel tasks (within a job), start your YARN job such that each TaskManager has one slot, and start many TaskManagers. That is a bit less efficient (but not much) than fewer TaskManagers with more slots. (*)

  - If you need to isolate successor tasks in a job against predecessor tasks, you can select "batch" execution mode. By default, the system uses "pipelined" execution mode. In a MapReduce case, this means that mappers and reducers run concurrently. With "batch" mode, reducers run only after all mappers finished.

Greetings,
Stephan


(*) The reason why multiple slots in one TaskManager are more efficient is that TaskManagers multiplex multiple data exchanges of a shuffle through a TCP connection, reducing per-exchange overhead and usually increasing throughput.



On Thu, Jul 30, 2015 at 12:10 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

it is currently not possible to isolate tasks that consume a lot of JVM heap memory and schedule them to a specific slot (or TaskManager).
If you operate in a YARN setup, you can isolate different jobs from each other by starting a new YARN session for each job, but tasks within the same job cannot be isolated from each other right now.

Cheers, Fabian

2015-07-30 4:02 GMT+02:00 wangzhijiang999 <[hidden email]>:

As I know, flink uses thread model in TaskManager, that means one taskmanager process may run many different operator threads,and these threads will compete the memory of the process. I know that flink has memoryManage component in each taskManager, and it will control the localBufferPool of InputGate, ResultPartition for each task,but if UDF consume much memory, it will use jvm heap memory, so it can not be controlled by flink. If I use flink as common platform, some users will consume much memory in UDF, and it may influence other threads in the process, especially for OOM.  I know that it has sharedslot or isolated slot properties , but it just limit the task schedule in one taskmanager, can i schedule task in separate taskmanger if i consume much memory and donot want to influence other tasks. Or are there any suggestions for the issue of thread model. As I know spark is also thread model, but hadoop2 use process model.


Thank you for any suggestions in advance!



Reply | Threaded
Open this post in threaded view
|

答复:thread model issue in TaskManager

Zhijiang(wangzhijiang999)
In reply to this post by Fabian Hueske-2

Hi Stephan,Fabian


       Thank you for your reply!  I will run the flink on yarn actually . It is feasible to isolate different tasks in one job by starting new yarn session. And it means every job will have a yarn seesion, and one taskManager just has one slot. If I want to run all jobs in one yarn cluster in pipelined mode, and one taskManager can run many tasks, another way is to use process mode, that means every task will be a process not thread, so isolation is natural. Do you think it is feasible to modify flink runtime to realize this? Or if we want to do that, are there any suggestions?  Thank you!


------------------------------------------------------------------

发件人:Stephan Ewen <[hidden email]>

发送时间:2015年8月3日(星期一) 00:36

收件人:user <[hidden email]>

抄 送:wangzhijiang999 <[hidden email]>

主 题:Re: thread model issue in TaskManager



Here are some additional things you can do:


  - For isolation between parallel tasks (within a job), start your YARN job such that each TaskManager has one slot, and start many TaskManagers. That is a bit less efficient (but not much) than fewer TaskManagers with more slots. (*)


  - If you need to isolate successor tasks in a job against predecessor tasks, you can select "batch" execution mode. By default, the system uses "pipelined" execution mode. In a MapReduce case, this means that mappers and reducers run concurrently. With "batch" mode, reducers run only after all mappers finished.


Greetings,

Stephan



(*) The reason why multiple slots in one TaskManager are more efficient is that TaskManagers multiplex multiple data exchanges of a shuffle through a TCP connection, reducing per-exchange overhead and usually increasing throughput.



As Fabian suggested, YARN is a good way to go for isolation (it actually isolates more than a JVM, which is very nice).


On Thu, Jul 30, 2015 at 12:10 PM, Fabian Hueske <[hidden email]> wrote:

Hi,


it is currently not possible to isolate tasks that consume a lot of JVM heap memory and schedule them to a specific slot (or TaskManager).

If you operate in a YARN setup, you can isolate different jobs from each other by starting a new YARN session for each job, but tasks within the same job cannot be isolated from each other right now.


Cheers, Fabian


2015-07-30 4:02 GMT+02:00 wangzhijiang999 <[hidden email]>:

As I know, flink uses thread model in TaskManager, that means one taskmanager process may run many different operator threads,and these threads will compete the memory of the process. I know that flink has memoryManage component in each taskManager, and it will control the localBufferPool of InputGate, ResultPartition for each task,but if UDF consume much memory, it will use jvm heap memory, so it can not be controlled by flink. If I use flink as common platform, some users will consume much memory in UDF, and it may influence other threads in the process, especially for OOM.  I know that it has sharedslot or isolated slot properties , but it just limit the task schedule in one taskmanager, can i schedule task in separate taskmanger if i consume much memory and donot want to influence other tasks. Or are there any suggestions for the issue of thread model. As I know spark is also thread model, but hadoop2 use process model.


Thank you for any suggestions in advance!




Reply | Threaded
Open this post in threaded view
|

Re: 答复:thread model issue in TaskManager

Stephan Ewen
In order to spawn a process that executes as task as a process, that process would need the following:

  - Communication to the TaskManager, or directly to the JobManager
  - Network stack for shuffles to exchange data with other processes (as exchanges go streaming and through memory and not files)
  - Memory Manager
  - I/O manager

That is almost a full TaskManager by itself. Using a TaskManager per job and task is then super close to that model directly.

What would help is to have a mode where these TaskManagers are spawned as needed, by the JobManager, using YARN or Mesos. This would then be very close to the Hadoop2/YARN/Tez model, which is a good isolation model.

What do you think?

   

On Mon, Aug 3, 2015 at 4:12 AM, wangzhijiang999 <[hidden email]> wrote:

Hi Stephan,Fabian


       Thank you for your reply!  I will run the flink on yarn actually . It is feasible to isolate different tasks in one job by starting new yarn session. And it means every job will have a yarn seesion, and one taskManager just has one slot. If I want to run all jobs in one yarn cluster in pipelined mode, and one taskManager can run many tasks, another way is to use process mode, that means every task will be a process not thread, so isolation is natural. Do you think it is feasible to modify flink runtime to realize this? Or if we want to do that, are there any suggestions?  Thank you!


------------------------------------------------------------------

发件人:Stephan Ewen <[hidden email]>

发送时间:2015年8月3日(星期一) 00:36

收件人:user <[hidden email]>

抄 送:wangzhijiang999 <[hidden email]>

主 题:Re: thread model issue in TaskManager



Here are some additional things you can do:


  - For isolation between parallel tasks (within a job), start your YARN job such that each TaskManager has one slot, and start many TaskManagers. That is a bit less efficient (but not much) than fewer TaskManagers with more slots. (*)


  - If you need to isolate successor tasks in a job against predecessor tasks, you can select "batch" execution mode. By default, the system uses "pipelined" execution mode. In a MapReduce case, this means that mappers and reducers run concurrently. With "batch" mode, reducers run only after all mappers finished.


Greetings,

Stephan



(*) The reason why multiple slots in one TaskManager are more efficient is that TaskManagers multiplex multiple data exchanges of a shuffle through a TCP connection, reducing per-exchange overhead and usually increasing throughput.



As Fabian suggested, YARN is a good way to go for isolation (it actually isolates more than a JVM, which is very nice).


On Thu, Jul 30, 2015 at 12:10 PM, Fabian Hueske <[hidden email]> wrote:

Hi,


it is currently not possible to isolate tasks that consume a lot of JVM heap memory and schedule them to a specific slot (or TaskManager).

If you operate in a YARN setup, you can isolate different jobs from each other by starting a new YARN session for each job, but tasks within the same job cannot be isolated from each other right now.


Cheers, Fabian


2015-07-30 4:02 GMT+02:00 wangzhijiang999 <[hidden email]>:

As I know, flink uses thread model in TaskManager, that means one taskmanager process may run many different operator threads,and these threads will compete the memory of the process. I know that flink has memoryManage component in each taskManager, and it will control the localBufferPool of InputGate, ResultPartition for each task,but if UDF consume much memory, it will use jvm heap memory, so it can not be controlled by flink. If I use flink as common platform, some users will consume much memory in UDF, and it may influence other threads in the process, especially for OOM.  I know that it has sharedslot or isolated slot properties , but it just limit the task schedule in one taskmanager, can i schedule task in separate taskmanger if i consume much memory and donot want to influence other tasks. Or are there any suggestions for the issue of thread model. As I know spark is also thread model, but hadoop2 use process model.


Thank you for any suggestions in advance!





Reply | Threaded
Open this post in threaded view
|

答复:答复:thread model issue in TaskManager

Zhijiang(wangzhijiang999)
In reply to this post by Zhijiang(wangzhijiang999)

Thank you for suggestions, all my applications will run in yarn, I want to  use  jobgraph model in flink, and resort to runtime stack in twitter heron. 


Some details need to be considered later.  I am further researching flink code now. 


------------------------------------------------------------------

发件人:Stephan Ewen <[hidden email]>

发送时间:2015年8月3日(星期一) 22:17

收件人:user <[hidden email]>,wangzhijiang999 <[hidden email]>

主 题:Re: 答复:thread model issue in TaskManager



  - Communication to the TaskManager, or directly to the JobManager

  - Network stack for shuffles to exchange data with other processes (as exchanges go streaming and through memory and not files)

  - Memory Manager

  - I/O manager


That is almost a full TaskManager by itself. Using a TaskManager per job and task is then super close to that model directly.


What would help is to have a mode where these TaskManagers are spawned as needed, by the JobManager, using YARN or Mesos. This would then be very close to the Hadoop2/YARN/Tez model, which is a good isolation model.


What do you think?


   

In order to spawn a process that executes as task as a process, that process would need the following:


On Mon, Aug 3, 2015 at 4:12 AM, wangzhijiang999 <[hidden email]> wrote:

Hi Stephan,Fabian


       Thank you for your reply!  I will run the flink on yarn actually . It is feasible to isolate different tasks in one job by starting new yarn session. And it means every job will have a yarn seesion, and one taskManager just has one slot. If I want to run all jobs in one yarn cluster in pipelined mode, and one taskManager can run many tasks, another way is to use process mode, that means every task will be a process not thread, so isolation is natural. Do you think it is feasible to modify flink runtime to realize this? Or if we want to do that, are there any suggestions?  Thank you!


------------------------------------------------------------------

发件人:Stephan Ewen <[hidden email]>

发送时间:2015年8月3日(星期一) 00:36

收件人:user <[hidden email]>

抄 送:wangzhijiang999 <[hidden email]>

主 题:Re: thread model issue in TaskManager



Here are some additional things you can do:


  - For isolation between parallel tasks (within a job), start your YARN job such that each TaskManager has one slot, and start many TaskManagers. That is a bit less efficient (but not much) than fewer TaskManagers with more slots. (*)


  - If you need to isolate successor tasks in a job against predecessor tasks, you can select "batch" execution mode. By default, the system uses "pipelined" execution mode. In a MapReduce case, this means that mappers and reducers run concurrently. With "batch" mode, reducers run only after all mappers finished.


Greetings,

Stephan



(*) The reason why multiple slots in one TaskManager are more efficient is that TaskManagers multiplex multiple data exchanges of a shuffle through a TCP connection, reducing per-exchange overhead and usually increasing throughput.



As Fabian suggested, YARN is a good way to go for isolation (it actually isolates more than a JVM, which is very nice).


On Thu, Jul 30, 2015 at 12:10 PM, Fabian Hueske <[hidden email]> wrote:

Hi,


it is currently not possible to isolate tasks that consume a lot of JVM heap memory and schedule them to a specific slot (or TaskManager).

If you operate in a YARN setup, you can isolate different jobs from each other by starting a new YARN session for each job, but tasks within the same job cannot be isolated from each other right now.


Cheers, Fabian


2015-07-30 4:02 GMT+02:00 wangzhijiang999 <[hidden email]>:

As I know, flink uses thread model in TaskManager, that means one taskmanager process may run many different operator threads,and these threads will compete the memory of the process. I know that flink has memoryManage component in each taskManager, and it will control the localBufferPool of InputGate, ResultPartition for each task,but if UDF consume much memory, it will use jvm heap memory, so it can not be controlled by flink. If I use flink as common platform, some users will consume much memory in UDF, and it may influence other threads in the process, especially for OOM.  I know that it has sharedslot or isolated slot properties , but it just limit the task schedule in one taskmanager, can i schedule task in separate taskmanger if i consume much memory and donot want to influence other tasks. Or are there any suggestions for the issue of thread model. As I know spark is also thread model, but hadoop2 use process model.


Thank you for any suggestions in advance!