[DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

Guowei Ma
Hi, all


In the Flink 1.12 we introduce the TM merge shuffle. But the out-of-the-box experience of using TM merge shuffle is not very good. The main reason is that the default configuration always makes users encounter OOM [1]. So we hope to introduce a managed memory pool for TM merge shuffle to avoid the problem.

Goals

  1. Don't affect the streaming and pipelined-shuffle-only batch setups.
  2. Don't mix memory with different life cycle in the same pool. E.g., write buffers needed by running tasks and read buffer needed even after tasks being finished.
  3. User can use the TM merge shuffle with default memory configurations. (May need further tunings for performance optimization, but should not fail with the default configurations.)

Proposal

  1. Introduce a configuration `taskmanager.memory.network.batch-read` to specify the size of this memory pool. The default value is 16m.
  2. Allocate the pool lazily. It means that the memory pool would be allocated when the TM merge shuffle is used at the first time.
  3. This pool size will not be add up to the TM's total memory size, but will be considered part of `taskmanager.memory.framework.off-heap.size`. We need to check that the pool size is not larger than the framework off-heap size, if TM merge shuffle is enabled.


In this default configuration, the allocation of the memory pool is almost impossible to fail. Currently the default framework’s off-heap memory is 128m, which is mainly used by Netty. But after we introduced zero copy, the usage of it has been reduced, and you can refer to the detailed data [2].  

Known Limitation

Usability for increasing the memory pool size

In addition to increasing `taskmanager.memory.network.batch-read`, the user may also need to adjust `taskmanager.memory.framework.off-heap.size` at the same time. It also means that once the user forgets this, it is likely to fail the check when allocating the memory pool.


So in the following two situations, we will still prompt the user to increase the size of `framework.off-heap.size`.

  1. `taskmanager.memory.network.batch-read` is bigger than `taskmanager.memory.framework.off-heap.size`
  2. Allocating the pool encounters the OOM.


An alternative is that when the user adjusts the size of the memory pool, the system automatically adjusts it. But we are not entierly sure about this, given its implicity and complicating the memory configurations. 

Potential memory waste

In the first step, the memory pool will not be released once allocated. This means in the first step, even if there is no subsequent batch job, the pooled memory cannot be used by other consumers.


We are not releasing the pool in the first step due to the concern that frequently allocating/deallocating the entire pool may increase the GC pressue. Investitations on how to dynamically release the pool when it's no longer needed is considered a future follow-up.


Looking forward to your feedback.

 

[1] https://issues.apache.org/jira/browse/FLINK-20740

[2] https://github.com/apache/flink/pull/7368.

Best,
Guowei
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

Stephan Ewen
Thanks Guowei, for the proposal.

As discussed offline already, I think this sounds good.

One thought is that 16m sounds very small for a default read buffer pool. How risky do you think it is to increase this to 32m or 64m?

Best,
Stephan

On Fri, Mar 5, 2021 at 4:33 AM Guowei Ma <[hidden email]> wrote:
Hi, all


In the Flink 1.12 we introduce the TM merge shuffle. But the out-of-the-box experience of using TM merge shuffle is not very good. The main reason is that the default configuration always makes users encounter OOM [1]. So we hope to introduce a managed memory pool for TM merge shuffle to avoid the problem.

Goals

  1. Don't affect the streaming and pipelined-shuffle-only batch setups.
  2. Don't mix memory with different life cycle in the same pool. E.g., write buffers needed by running tasks and read buffer needed even after tasks being finished.
  3. User can use the TM merge shuffle with default memory configurations. (May need further tunings for performance optimization, but should not fail with the default configurations.)

Proposal

  1. Introduce a configuration `taskmanager.memory.network.batch-read` to specify the size of this memory pool. The default value is 16m.
  2. Allocate the pool lazily. It means that the memory pool would be allocated when the TM merge shuffle is used at the first time.
  3. This pool size will not be add up to the TM's total memory size, but will be considered part of `taskmanager.memory.framework.off-heap.size`. We need to check that the pool size is not larger than the framework off-heap size, if TM merge shuffle is enabled.


In this default configuration, the allocation of the memory pool is almost impossible to fail. Currently the default framework’s off-heap memory is 128m, which is mainly used by Netty. But after we introduced zero copy, the usage of it has been reduced, and you can refer to the detailed data [2].  

Known Limitation

Usability for increasing the memory pool size

In addition to increasing `taskmanager.memory.network.batch-read`, the user may also need to adjust `taskmanager.memory.framework.off-heap.size` at the same time. It also means that once the user forgets this, it is likely to fail the check when allocating the memory pool.


So in the following two situations, we will still prompt the user to increase the size of `framework.off-heap.size`.

  1. `taskmanager.memory.network.batch-read` is bigger than `taskmanager.memory.framework.off-heap.size`
  2. Allocating the pool encounters the OOM.


An alternative is that when the user adjusts the size of the memory pool, the system automatically adjusts it. But we are not entierly sure about this, given its implicity and complicating the memory configurations. 

Potential memory waste

In the first step, the memory pool will not be released once allocated. This means in the first step, even if there is no subsequent batch job, the pooled memory cannot be used by other consumers.


We are not releasing the pool in the first step due to the concern that frequently allocating/deallocating the entire pool may increase the GC pressue. Investitations on how to dynamically release the pool when it's no longer needed is considered a future follow-up.


Looking forward to your feedback.

 

[1] https://issues.apache.org/jira/browse/FLINK-20740

[2] https://github.com/apache/flink/pull/7368.

Best,
Guowei
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

Till Rohrmann
Thanks for this proposal Guowei. +1 for it.

Concerning the default size, maybe we can run some experiments and see how the system behaves with different pool sizes.

Cheers,
Till

On Fri, Mar 5, 2021 at 2:45 PM Stephan Ewen <[hidden email]> wrote:
Thanks Guowei, for the proposal.

As discussed offline already, I think this sounds good.

One thought is that 16m sounds very small for a default read buffer pool. How risky do you think it is to increase this to 32m or 64m?

Best,
Stephan

On Fri, Mar 5, 2021 at 4:33 AM Guowei Ma <[hidden email]> wrote:
Hi, all


In the Flink 1.12 we introduce the TM merge shuffle. But the out-of-the-box experience of using TM merge shuffle is not very good. The main reason is that the default configuration always makes users encounter OOM [1]. So we hope to introduce a managed memory pool for TM merge shuffle to avoid the problem.

Goals

  1. Don't affect the streaming and pipelined-shuffle-only batch setups.
  2. Don't mix memory with different life cycle in the same pool. E.g., write buffers needed by running tasks and read buffer needed even after tasks being finished.
  3. User can use the TM merge shuffle with default memory configurations. (May need further tunings for performance optimization, but should not fail with the default configurations.)

Proposal

  1. Introduce a configuration `taskmanager.memory.network.batch-read` to specify the size of this memory pool. The default value is 16m.
  2. Allocate the pool lazily. It means that the memory pool would be allocated when the TM merge shuffle is used at the first time.
  3. This pool size will not be add up to the TM's total memory size, but will be considered part of `taskmanager.memory.framework.off-heap.size`. We need to check that the pool size is not larger than the framework off-heap size, if TM merge shuffle is enabled.


In this default configuration, the allocation of the memory pool is almost impossible to fail. Currently the default framework’s off-heap memory is 128m, which is mainly used by Netty. But after we introduced zero copy, the usage of it has been reduced, and you can refer to the detailed data [2].  

Known Limitation

Usability for increasing the memory pool size

In addition to increasing `taskmanager.memory.network.batch-read`, the user may also need to adjust `taskmanager.memory.framework.off-heap.size` at the same time. It also means that once the user forgets this, it is likely to fail the check when allocating the memory pool.


So in the following two situations, we will still prompt the user to increase the size of `framework.off-heap.size`.

  1. `taskmanager.memory.network.batch-read` is bigger than `taskmanager.memory.framework.off-heap.size`
  2. Allocating the pool encounters the OOM.


An alternative is that when the user adjusts the size of the memory pool, the system automatically adjusts it. But we are not entierly sure about this, given its implicity and complicating the memory configurations. 

Potential memory waste

In the first step, the memory pool will not be released once allocated. This means in the first step, even if there is no subsequent batch job, the pooled memory cannot be used by other consumers.


We are not releasing the pool in the first step due to the concern that frequently allocating/deallocating the entire pool may increase the GC pressue. Investitations on how to dynamically release the pool when it's no longer needed is considered a future follow-up.


Looking forward to your feedback.

 

[1] https://issues.apache.org/jira/browse/FLINK-20740

[2] https://github.com/apache/flink/pull/7368.

Best,
Guowei
Reply | Threaded
Open this post in threaded view
|

Re: Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

Guowei Ma
Hi, all

Thanks all for your suggestions and feedback.
I think it is a good idea that we increase the default size of the separated pool by testing. I am fine with adding the suffix(".size") to the config name, which makes it more clear to the user.
But I am a little worried about adding a prefix("framework") because currently the tm shuffle service is only a shuffle-plugin, which is not a part of the framework. So maybe we could add a clear explanation in the document?

Best,
Guowei


On Tue, Mar 9, 2021 at 3:58 PM 曹英杰(北牧) <[hidden email]> wrote:
Thanks for the suggestions. I will do some tests and share the results after the implementation is ready. Then we can give a proper default value.

Best,
Yingjie
------------------------------------------------------------------
发件人:Till Rohrmann<[hidden email]>
日 期:2021年03月05日 23:03:10
收件人:Stephan Ewen<[hidden email]>
抄 送:dev<[hidden email]>; user<[hidden email]>; Xintong Song<[hidden email]>; 曹英杰(北牧)<[hidden email]>; Guowei Ma<[hidden email]>
主 题:Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

Thanks for this proposal Guowei. +1 for it.

Concerning the default size, maybe we can run some experiments and see how the system behaves with different pool sizes.

Cheers,
Till

On Fri, Mar 5, 2021 at 2:45 PM Stephan Ewen <[hidden email]> wrote:
Thanks Guowei, for the proposal.

As discussed offline already, I think this sounds good.

One thought is that 16m sounds very small for a default read buffer pool. How risky do you think it is to increase this to 32m or 64m?

Best,
Stephan

On Fri, Mar 5, 2021 at 4:33 AM Guowei Ma <[hidden email]> wrote:
Hi, all


In the Flink 1.12 we introduce the TM merge shuffle. But the out-of-the-box experience of using TM merge shuffle is not very good. The main reason is that the default configuration always makes users encounter OOM [1]. So we hope to introduce a managed memory pool for TM merge shuffle to avoid the problem.

Goals

  1. Don't affect the streaming and pipelined-shuffle-only batch setups.
  2. Don't mix memory with different life cycle in the same pool. E.g., write buffers needed by running tasks and read buffer needed even after tasks being finished.
  3. User can use the TM merge shuffle with default memory configurations. (May need further tunings for performance optimization, but should not fail with the default configurations.)

Proposal

  1. Introduce a configuration `taskmanager.memory.network.batch-read` to specify the size of this memory pool. The default value is 16m.
  2. Allocate the pool lazily. It means that the memory pool would be allocated when the TM merge shuffle is used at the first time.
  3. This pool size will not be add up to the TM's total memory size, but will be considered part of `taskmanager.memory.framework.off-heap.size`. We need to check that the pool size is not larger than the framework off-heap size, if TM merge shuffle is enabled.


In this default configuration, the allocation of the memory pool is almost impossible to fail. Currently the default framework’s off-heap memory is 128m, which is mainly used by Netty. But after we introduced zero copy, the usage of it has been reduced, and you can refer to the detailed data [2].  

Known Limitation

Usability for increasing the memory pool size

In addition to increasing `taskmanager.memory.network.batch-read`, the user may also need to adjust `taskmanager.memory.framework.off-heap.size` at the same time. It also means that once the user forgets this, it is likely to fail the check when allocating the memory pool.


So in the following two situations, we will still prompt the user to increase the size of `framework.off-heap.size`.

  1. `taskmanager.memory.network.batch-read` is bigger than `taskmanager.memory.framework.off-heap.size`
  2. Allocating the pool encounters the OOM.


An alternative is that when the user adjusts the size of the memory pool, the system automatically adjusts it. But we are not entierly sure about this, given its implicity and complicating the memory configurations. 

Potential memory waste

In the first step, the memory pool will not be released once allocated. This means in the first step, even if there is no subsequent batch job, the pooled memory cannot be used by other consumers.


We are not releasing the pool in the first step due to the concern that frequently allocating/deallocating the entire pool may increase the GC pressue. Investitations on how to dynamically release the pool when it's no longer needed is considered a future follow-up.


Looking forward to your feedback.

 

[1] https://issues.apache.org/jira/browse/FLINK-20740

[2] https://github.com/apache/flink/pull/7368.

Best,
Guowei

Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

Till Rohrmann
Thanks for the update Yingjie. Then let's go with 32 MB I would say.

Concerning the name of the configuration option I see Xintong's point. If the batch shuffle is subtracted from `taskmanager.memory.framework.off-heap.size` because it is part of the off-heap pool, then something like `taskmanager.memory.framework.off-heap.batch-shuffle.size` would better reflect this situation. On the other hand, this is quite a long configuration name. But it is also a quite advanced configuration option which, hopefully, should not be touched by too many of our users.

Cheers,
Till

On Mon, Mar 22, 2021 at 9:15 AM 曹英杰(北牧) <[hidden email]> wrote:
Hi all,

I have tested the default memory size with both batch (tpcds) and streaming jobs running in one session cluster (more than 100 queries). The result is: 
1. All settings (16M, 32M and 64M) can work well without any OOM.
2. For streaming jobs running after batch jobs, there is no performance or stability regression.
2. 32M and 64M is better (over 10%) in terms of performance for the test batch job on HDD.

Based on the above results, I think 32M is a good default choice, because the performance is good enough for the test job and compared to 64M, more direct memory can be used by netty and other components. What do you think?

BTW, about the configuration key, do we reach a consensus? I am temporarily using taskmanager.memory.network.batch-shuffle-read.size in my PR now. Any suggestions about that?

Best,
Yingjie (Kevin)
------------------------------------------------------------------
发件人:Guowei Ma<[hidden email]>
日 期:2021年03月09日 17:28:35
收件人:曹英杰(北牧)<[hidden email]>
抄 送:Till Rohrmann<[hidden email]>; Stephan Ewen<[hidden email]>; dev<[hidden email]>; user<[hidden email]>; Xintong Song<[hidden email]>
主 题:Re: Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

Hi, all

Thanks all for your suggestions and feedback.
I think it is a good idea that we increase the default size of the separated pool by testing. I am fine with adding the suffix(".size") to the config name, which makes it more clear to the user.
But I am a little worried about adding a prefix("framework") because currently the tm shuffle service is only a shuffle-plugin, which is not a part of the framework. So maybe we could add a clear explanation in the document?

Best,
Guowei


On Tue, Mar 9, 2021 at 3:58 PM 曹英杰(北牧) <[hidden email]> wrote:
Thanks for the suggestions. I will do some tests and share the results after the implementation is ready. Then we can give a proper default value.

Best,
Yingjie
------------------------------------------------------------------
发件人:Till Rohrmann<[hidden email]>
日 期:2021年03月05日 23:03:10
收件人:Stephan Ewen<[hidden email]>
抄 送:dev<[hidden email]>; user<[hidden email]>; Xintong Song<[hidden email]>; 曹英杰(北牧)<[hidden email]>; Guowei Ma<[hidden email]>
主 题:Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

Thanks for this proposal Guowei. +1 for it.

Concerning the default size, maybe we can run some experiments and see how the system behaves with different pool sizes.

Cheers,
Till

On Fri, Mar 5, 2021 at 2:45 PM Stephan Ewen <[hidden email]> wrote:
Thanks Guowei, for the proposal.

As discussed offline already, I think this sounds good.

One thought is that 16m sounds very small for a default read buffer pool. How risky do you think it is to increase this to 32m or 64m?

Best,
Stephan

On Fri, Mar 5, 2021 at 4:33 AM Guowei Ma <[hidden email]> wrote:
Hi, all


In the Flink 1.12 we introduce the TM merge shuffle. But the out-of-the-box experience of using TM merge shuffle is not very good. The main reason is that the default configuration always makes users encounter OOM [1]. So we hope to introduce a managed memory pool for TM merge shuffle to avoid the problem.

Goals

  1. Don't affect the streaming and pipelined-shuffle-only batch setups.
  2. Don't mix memory with different life cycle in the same pool. E.g., write buffers needed by running tasks and read buffer needed even after tasks being finished.
  3. User can use the TM merge shuffle with default memory configurations. (May need further tunings for performance optimization, but should not fail with the default configurations.)

Proposal

  1. Introduce a configuration `taskmanager.memory.network.batch-read` to specify the size of this memory pool. The default value is 16m.
  2. Allocate the pool lazily. It means that the memory pool would be allocated when the TM merge shuffle is used at the first time.
  3. This pool size will not be add up to the TM's total memory size, but will be considered part of `taskmanager.memory.framework.off-heap.size`. We need to check that the pool size is not larger than the framework off-heap size, if TM merge shuffle is enabled.


In this default configuration, the allocation of the memory pool is almost impossible to fail. Currently the default framework’s off-heap memory is 128m, which is mainly used by Netty. But after we introduced zero copy, the usage of it has been reduced, and you can refer to the detailed data [2].  

Known Limitation

Usability for increasing the memory pool size

In addition to increasing `taskmanager.memory.network.batch-read`, the user may also need to adjust `taskmanager.memory.framework.off-heap.size` at the same time. It also means that once the user forgets this, it is likely to fail the check when allocating the memory pool.


So in the following two situations, we will still prompt the user to increase the size of `framework.off-heap.size`.

  1. `taskmanager.memory.network.batch-read` is bigger than `taskmanager.memory.framework.off-heap.size`
  2. Allocating the pool encounters the OOM.


An alternative is that when the user adjusts the size of the memory pool, the system automatically adjusts it. But we are not entierly sure about this, given its implicity and complicating the memory configurations. 

Potential memory waste

In the first step, the memory pool will not be released once allocated. This means in the first step, even if there is no subsequent batch job, the pooled memory cannot be used by other consumers.


We are not releasing the pool in the first step due to the concern that frequently allocating/deallocating the entire pool may increase the GC pressue. Investitations on how to dynamically release the pool when it's no longer needed is considered a future follow-up.


Looking forward to your feedback.

 

[1] https://issues.apache.org/jira/browse/FLINK-20740

[2] https://github.com/apache/flink/pull/7368.

Best,
Guowei


Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

Stephan Ewen
In reply to this post by Guowei Ma
Hi Yingjie!

Thanks for doing those experiments, the results look good. Let's go ahead with 32M then.

Regarding the key, I am not strongly opinionated there. There are arguments for both keys, (1) making the key part of the network pool config as you did here or (2) making it part of the TM config (relative to framework off-heap memory). I find (1) quite understandable, but it is personal taste, so I can go with either option.

Best,
Stephan


On Mon, Mar 22, 2021 at 9:15 AM 曹英杰(北牧) <[hidden email]> wrote:
Hi all,

I have tested the default memory size with both batch (tpcds) and streaming jobs running in one session cluster (more than 100 queries). The result is: 
1. All settings (16M, 32M and 64M) can work well without any OOM.
2. For streaming jobs running after batch jobs, there is no performance or stability regression.
2. 32M and 64M is better (over 10%) in terms of performance for the test batch job on HDD.

Based on the above results, I think 32M is a good default choice, because the performance is good enough for the test job and compared to 64M, more direct memory can be used by netty and other components. What do you think?

BTW, about the configuration key, do we reach a consensus? I am temporarily using taskmanager.memory.network.batch-shuffle-read.size in my PR now. Any suggestions about that?

Best,
Yingjie (Kevin)
------------------------------------------------------------------
发件人:Guowei Ma<[hidden email]>
日 期:2021年03月09日 17:28:35
收件人:曹英杰(北牧)<[hidden email]>
抄 送:Till Rohrmann<[hidden email]>; Stephan Ewen<[hidden email]>; dev<[hidden email]>; user<[hidden email]>; Xintong Song<[hidden email]>
主 题:Re: Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

Hi, all

Thanks all for your suggestions and feedback.
I think it is a good idea that we increase the default size of the separated pool by testing. I am fine with adding the suffix(".size") to the config name, which makes it more clear to the user.
But I am a little worried about adding a prefix("framework") because currently the tm shuffle service is only a shuffle-plugin, which is not a part of the framework. So maybe we could add a clear explanation in the document?

Best,
Guowei


On Tue, Mar 9, 2021 at 3:58 PM 曹英杰(北牧) <[hidden email]> wrote:
Thanks for the suggestions. I will do some tests and share the results after the implementation is ready. Then we can give a proper default value.

Best,
Yingjie
------------------------------------------------------------------
发件人:Till Rohrmann<[hidden email]>
日 期:2021年03月05日 23:03:10
收件人:Stephan Ewen<[hidden email]>
抄 送:dev<[hidden email]>; user<[hidden email]>; Xintong Song<[hidden email]>; 曹英杰(北牧)<[hidden email]>; Guowei Ma<[hidden email]>
主 题:Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

Thanks for this proposal Guowei. +1 for it.

Concerning the default size, maybe we can run some experiments and see how the system behaves with different pool sizes.

Cheers,
Till

On Fri, Mar 5, 2021 at 2:45 PM Stephan Ewen <[hidden email]> wrote:
Thanks Guowei, for the proposal.

As discussed offline already, I think this sounds good.

One thought is that 16m sounds very small for a default read buffer pool. How risky do you think it is to increase this to 32m or 64m?

Best,
Stephan

On Fri, Mar 5, 2021 at 4:33 AM Guowei Ma <[hidden email]> wrote:
Hi, all


In the Flink 1.12 we introduce the TM merge shuffle. But the out-of-the-box experience of using TM merge shuffle is not very good. The main reason is that the default configuration always makes users encounter OOM [1]. So we hope to introduce a managed memory pool for TM merge shuffle to avoid the problem.

Goals

  1. Don't affect the streaming and pipelined-shuffle-only batch setups.
  2. Don't mix memory with different life cycle in the same pool. E.g., write buffers needed by running tasks and read buffer needed even after tasks being finished.
  3. User can use the TM merge shuffle with default memory configurations. (May need further tunings for performance optimization, but should not fail with the default configurations.)

Proposal

  1. Introduce a configuration `taskmanager.memory.network.batch-read` to specify the size of this memory pool. The default value is 16m.
  2. Allocate the pool lazily. It means that the memory pool would be allocated when the TM merge shuffle is used at the first time.
  3. This pool size will not be add up to the TM's total memory size, but will be considered part of `taskmanager.memory.framework.off-heap.size`. We need to check that the pool size is not larger than the framework off-heap size, if TM merge shuffle is enabled.


In this default configuration, the allocation of the memory pool is almost impossible to fail. Currently the default framework’s off-heap memory is 128m, which is mainly used by Netty. But after we introduced zero copy, the usage of it has been reduced, and you can refer to the detailed data [2].  

Known Limitation

Usability for increasing the memory pool size

In addition to increasing `taskmanager.memory.network.batch-read`, the user may also need to adjust `taskmanager.memory.framework.off-heap.size` at the same time. It also means that once the user forgets this, it is likely to fail the check when allocating the memory pool.


So in the following two situations, we will still prompt the user to increase the size of `framework.off-heap.size`.

  1. `taskmanager.memory.network.batch-read` is bigger than `taskmanager.memory.framework.off-heap.size`
  2. Allocating the pool encounters the OOM.


An alternative is that when the user adjusts the size of the memory pool, the system automatically adjusts it. But we are not entierly sure about this, given its implicity and complicating the memory configurations. 

Potential memory waste

In the first step, the memory pool will not be released once allocated. This means in the first step, even if there is no subsequent batch job, the pooled memory cannot be used by other consumers.


We are not releasing the pool in the first step due to the concern that frequently allocating/deallocating the entire pool may increase the GC pressue. Investitations on how to dynamically release the pool when it's no longer needed is considered a future follow-up.


Looking forward to your feedback.

 

[1] https://issues.apache.org/jira/browse/FLINK-20740

[2] https://github.com/apache/flink/pull/7368.

Best,
Guowei


Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

Guowei Ma
Hi,
I discussed with Xingtong and Yingjie offline and we agreed that the name `taskmanager.memory.framework.off-heap.batch-shuffle.size` can better reflect the current memory usage. So we decided to use the name Till suggested.
Thank you all for your valuable feedback.
Best,
Guowei


On Mon, Mar 22, 2021 at 5:21 PM Stephan Ewen <[hidden email]> wrote:
Hi Yingjie!

Thanks for doing those experiments, the results look good. Let's go ahead with 32M then.

Regarding the key, I am not strongly opinionated there. There are arguments for both keys, (1) making the key part of the network pool config as you did here or (2) making it part of the TM config (relative to framework off-heap memory). I find (1) quite understandable, but it is personal taste, so I can go with either option.

Best,
Stephan


On Mon, Mar 22, 2021 at 9:15 AM 曹英杰(北牧) <[hidden email]> wrote:
Hi all,

I have tested the default memory size with both batch (tpcds) and streaming jobs running in one session cluster (more than 100 queries). The result is: 
1. All settings (16M, 32M and 64M) can work well without any OOM.
2. For streaming jobs running after batch jobs, there is no performance or stability regression.
2. 32M and 64M is better (over 10%) in terms of performance for the test batch job on HDD.

Based on the above results, I think 32M is a good default choice, because the performance is good enough for the test job and compared to 64M, more direct memory can be used by netty and other components. What do you think?

BTW, about the configuration key, do we reach a consensus? I am temporarily using taskmanager.memory.network.batch-shuffle-read.size in my PR now. Any suggestions about that?

Best,
Yingjie (Kevin)
------------------------------------------------------------------
发件人:Guowei Ma<[hidden email]>
日 期:2021年03月09日 17:28:35
收件人:曹英杰(北牧)<[hidden email]>
抄 送:Till Rohrmann<[hidden email]>; Stephan Ewen<[hidden email]>; dev<[hidden email]>; user<[hidden email]>; Xintong Song<[hidden email]>
主 题:Re: Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

Hi, all

Thanks all for your suggestions and feedback.
I think it is a good idea that we increase the default size of the separated pool by testing. I am fine with adding the suffix(".size") to the config name, which makes it more clear to the user.
But I am a little worried about adding a prefix("framework") because currently the tm shuffle service is only a shuffle-plugin, which is not a part of the framework. So maybe we could add a clear explanation in the document?

Best,
Guowei


On Tue, Mar 9, 2021 at 3:58 PM 曹英杰(北牧) <[hidden email]> wrote:
Thanks for the suggestions. I will do some tests and share the results after the implementation is ready. Then we can give a proper default value.

Best,
Yingjie
------------------------------------------------------------------
发件人:Till Rohrmann<[hidden email]>
日 期:2021年03月05日 23:03:10
收件人:Stephan Ewen<[hidden email]>
抄 送:dev<[hidden email]>; user<[hidden email]>; Xintong Song<[hidden email]>; 曹英杰(北牧)<[hidden email]>; Guowei Ma<[hidden email]>
主 题:Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

Thanks for this proposal Guowei. +1 for it.

Concerning the default size, maybe we can run some experiments and see how the system behaves with different pool sizes.

Cheers,
Till

On Fri, Mar 5, 2021 at 2:45 PM Stephan Ewen <[hidden email]> wrote:
Thanks Guowei, for the proposal.

As discussed offline already, I think this sounds good.

One thought is that 16m sounds very small for a default read buffer pool. How risky do you think it is to increase this to 32m or 64m?

Best,
Stephan

On Fri, Mar 5, 2021 at 4:33 AM Guowei Ma <[hidden email]> wrote:
Hi, all


In the Flink 1.12 we introduce the TM merge shuffle. But the out-of-the-box experience of using TM merge shuffle is not very good. The main reason is that the default configuration always makes users encounter OOM [1]. So we hope to introduce a managed memory pool for TM merge shuffle to avoid the problem.

Goals

  1. Don't affect the streaming and pipelined-shuffle-only batch setups.
  2. Don't mix memory with different life cycle in the same pool. E.g., write buffers needed by running tasks and read buffer needed even after tasks being finished.
  3. User can use the TM merge shuffle with default memory configurations. (May need further tunings for performance optimization, but should not fail with the default configurations.)

Proposal

  1. Introduce a configuration `taskmanager.memory.network.batch-read` to specify the size of this memory pool. The default value is 16m.
  2. Allocate the pool lazily. It means that the memory pool would be allocated when the TM merge shuffle is used at the first time.
  3. This pool size will not be add up to the TM's total memory size, but will be considered part of `taskmanager.memory.framework.off-heap.size`. We need to check that the pool size is not larger than the framework off-heap size, if TM merge shuffle is enabled.


In this default configuration, the allocation of the memory pool is almost impossible to fail. Currently the default framework’s off-heap memory is 128m, which is mainly used by Netty. But after we introduced zero copy, the usage of it has been reduced, and you can refer to the detailed data [2].  

Known Limitation

Usability for increasing the memory pool size

In addition to increasing `taskmanager.memory.network.batch-read`, the user may also need to adjust `taskmanager.memory.framework.off-heap.size` at the same time. It also means that once the user forgets this, it is likely to fail the check when allocating the memory pool.


So in the following two situations, we will still prompt the user to increase the size of `framework.off-heap.size`.

  1. `taskmanager.memory.network.batch-read` is bigger than `taskmanager.memory.framework.off-heap.size`
  2. Allocating the pool encounters the OOM.


An alternative is that when the user adjusts the size of the memory pool, the system automatically adjusts it. But we are not entierly sure about this, given its implicity and complicating the memory configurations. 

Potential memory waste

In the first step, the memory pool will not be released once allocated. This means in the first step, even if there is no subsequent batch job, the pooled memory cannot be used by other consumers.


We are not releasing the pool in the first step due to the concern that frequently allocating/deallocating the entire pool may increase the GC pressue. Investitations on how to dynamically release the pool when it's no longer needed is considered a future follow-up.


Looking forward to your feedback.

 

[1] https://issues.apache.org/jira/browse/FLINK-20740

[2] https://github.com/apache/flink/pull/7368.

Best,
Guowei