Optimal Flink configuration for Standalone cluster.

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Optimal Flink configuration for Standalone cluster.

Dimitris Vogiatzidakis
Hello,

I'm having a bit of trouble understanding the memory configuration on flink.
I'm using flink10.0.0 to read some datasets of edges and extract features. I run this on a cluster consisting of 4 nodes , with 32cores and 252GB Ram each, and hopefully I could expand this as long as I can add extra nodes to the cluster.

So regarding the configuration file (flink-conf.yaml).
a) I can't understand when should I use process.size and when .flink.size.

b) From the detailed memory model I understand that Direct memory is included in both of flink and process size, however if I don't specify off-heap.task.size I get
" OutOfMemoryError: Direct buffer memory " .  Also should I change off-heap.fraction as well?

c)When I fix this, I get network buffers error, which if I understand correctly,  flink.size * network fraction , should be between min and max.

I can't find the 'perfect' configuration regarding my setup. What is the optimal way to use the system I have currently?

Thank you for your time.


Reply | Threaded
Open this post in threaded view
|

Re: Optimal Flink configuration for Standalone cluster.

Xintong Song
Hi Dimitris,

Regarding your questions.
a) For standalone clusters, the recommended way is to use `.flink.size` rather than `.process.size`. `.process.size` includes JVM metaspace and overhead in addition to `.flink.size`, which usually do not really matter for standalone clusters.
b) In case of direct OOMs, you should increase `taskmanager.memory.task.off-heap.size`. There's no fraction for that.
c) Your understanding is correct. And you can also specify the absolute network memory size by setting the min and max to the same value.

Here are my suggestions according to what you described.
  1. Since both off-heap and network memory seems insufficient, I would suggest to increase `taskmanager.memory.flink.size` to give your task managers more memory in total.
  2. If 1) does not work, I would suggest not to set the total memory (means configure neither `.flink.size` nor `process.size`), but go for the fine grained configuration where explicitly specify the individual memory components. Flink will automatically add them up to derive the total memory.
    1. In addition to `.task.off-heap.size` and `.network.[min|max]`, you will also need to set `.task.heap.size` and `managed.size`.
    2. If you don't know how many heap/managed memory to configure, you can look for the configuration options in the beginning of the TM logs (`-Dkey=value`). Those are the values derived from your current configuration.

Thank you~

Xintong Song



On Sat, Jun 27, 2020 at 10:56 PM Dimitris Vogiatzidakis <[hidden email]> wrote:
Hello,

I'm having a bit of trouble understanding the memory configuration on flink.
I'm using flink10.0.0 to read some datasets of edges and extract features. I run this on a cluster consisting of 4 nodes , with 32cores and 252GB Ram each, and hopefully I could expand this as long as I can add extra nodes to the cluster.

So regarding the configuration file (flink-conf.yaml).
a) I can't understand when should I use process.size and when .flink.size.

b) From the detailed memory model I understand that Direct memory is included in both of flink and process size, however if I don't specify off-heap.task.size I get
" OutOfMemoryError: Direct buffer memory " .  Also should I change off-heap.fraction as well?

c)When I fix this, I get network buffers error, which if I understand correctly,  flink.size * network fraction , should be between min and max.

I can't find the 'perfect' configuration regarding my setup. What is the optimal way to use the system I have currently?

Thank you for your time.


Reply | Threaded
Open this post in threaded view
|

Re: Optimal Flink configuration for Standalone cluster.

Dimitris Vogiatzidakis
Hi Xintong,
Thank you for the quick response.
doing 1), without increasing  'task.off-heap.size'  does not change the issue, but increasing the off-heap alone does.
What should the off-heap value size be? Since changing off-heap removes memory from '.task.heap.size' is there a ratio that I should follow for better performance?
Also, my guess (since I am dealing with big datasets) is that the more '.flink.size' I provide the better. Is that correct? Or will it add extra 'overhead' that could slow down my computations? In this particular cluster, since every Machine has 252 total DRAM and worst case scenario 180GB is free to use, should I just say .flink.size: 180g?

Thank you very much and sorry if i'm asking silly questions.
Dimitris Vogiatzidakis

On Sun, Jun 28, 2020 at 5:25 AM Xintong Song <[hidden email]> wrote:
Hi Dimitris,

Regarding your questions.
a) For standalone clusters, the recommended way is to use `.flink.size` rather than `.process.size`. `.process.size` includes JVM metaspace and overhead in addition to `.flink.size`, which usually do not really matter for standalone clusters.
b) In case of direct OOMs, you should increase `taskmanager.memory.task.off-heap.size`. There's no fraction for that.
c) Your understanding is correct. And you can also specify the absolute network memory size by setting the min and max to the same value.

Here are my suggestions according to what you described.
  1. Since both off-heap and network memory seems insufficient, I would suggest to increase `taskmanager.memory.flink.size` to give your task managers more memory in total.
  2. If 1) does not work, I would suggest not to set the total memory (means configure neither `.flink.size` nor `process.size`), but go for the fine grained configuration where explicitly specify the individual memory components. Flink will automatically add them up to derive the total memory.
    1. In addition to `.task.off-heap.size` and `.network.[min|max]`, you will also need to set `.task.heap.size` and `managed.size`.
    2. If you don't know how many heap/managed memory to configure, you can look for the configuration options in the beginning of the TM logs (`-Dkey=value`). Those are the values derived from your current configuration.

Thank you~

Xintong Song



On Sat, Jun 27, 2020 at 10:56 PM Dimitris Vogiatzidakis <[hidden email]> wrote:
Hello,

I'm having a bit of trouble understanding the memory configuration on flink.
I'm using flink10.0.0 to read some datasets of edges and extract features. I run this on a cluster consisting of 4 nodes , with 32cores and 252GB Ram each, and hopefully I could expand this as long as I can add extra nodes to the cluster.

So regarding the configuration file (flink-conf.yaml).
a) I can't understand when should I use process.size and when .flink.size.

b) From the detailed memory model I understand that Direct memory is included in both of flink and process size, however if I don't specify off-heap.task.size I get
" OutOfMemoryError: Direct buffer memory " .  Also should I change off-heap.fraction as well?

c)When I fix this, I get network buffers error, which if I understand correctly,  flink.size * network fraction , should be between min and max.

I can't find the 'perfect' configuration regarding my setup. What is the optimal way to use the system I have currently?

Thank you for your time.


Reply | Threaded
Open this post in threaded view
|

Re: Optimal Flink configuration for Standalone cluster.

Xintong Song
Since changing off-heap removes memory from '.task.heap.size' is there a ratio that I should follow for better performance?
I don't think so. It could really be specific to your workload. Some workload may need more heap memory while others may need more off-heap.

Also, my guess (since I am dealing with big datasets) is that the more '.flink.size' I provide the better. Is that correct?
In most cases, yes. But it is also possible the other way around. Larger `.flink.size` usually also means larger JVM heap space, which reduces the frequency of GCs but increases the time cost on each GC (espeacially full GCs). On the other hand, if the memory is large enough, it could become the CPU resource rather than the memory that limits the performance. In such cases, increasing memory size won't give you more performance improvement but might introduce more GC overheads, thus harm the overall performance.

In this particular cluster, since every Machine has 252 total DRAM and worst case scenario 180GB is free to use, should I just say .flink.size: 180g?
Not sure about this. I would suggest to avoid large task managers (say tens of GBs) unless absolutely necessary. Alternatively, you can try to launch multiple TMs on one physical machine, to reduce the memory size of each TM process.

BTW, what kind of workload are you running? Is it streaming or batch?
 

Thank you~

Xintong Song



On Mon, Jun 29, 2020 at 1:18 AM Dimitris Vogiatzidakis <[hidden email]> wrote:
Hi Xintong,
Thank you for the quick response.
doing 1), without increasing  'task.off-heap.size'  does not change the issue, but increasing the off-heap alone does.
What should the off-heap value size be? Since changing off-heap removes memory from '.task.heap.size' is there a ratio that I should follow for better performance?
Also, my guess (since I am dealing with big datasets) is that the more '.flink.size' I provide the better. Is that correct? Or will it add extra 'overhead' that could slow down my computations? In this particular cluster, since every Machine has 252 total DRAM and worst case scenario 180GB is free to use, should I just say .flink.size: 180g?

Thank you very much and sorry if i'm asking silly questions.
Dimitris Vogiatzidakis

On Sun, Jun 28, 2020 at 5:25 AM Xintong Song <[hidden email]> wrote:
Hi Dimitris,

Regarding your questions.
a) For standalone clusters, the recommended way is to use `.flink.size` rather than `.process.size`. `.process.size` includes JVM metaspace and overhead in addition to `.flink.size`, which usually do not really matter for standalone clusters.
b) In case of direct OOMs, you should increase `taskmanager.memory.task.off-heap.size`. There's no fraction for that.
c) Your understanding is correct. And you can also specify the absolute network memory size by setting the min and max to the same value.

Here are my suggestions according to what you described.
  1. Since both off-heap and network memory seems insufficient, I would suggest to increase `taskmanager.memory.flink.size` to give your task managers more memory in total.
  2. If 1) does not work, I would suggest not to set the total memory (means configure neither `.flink.size` nor `process.size`), but go for the fine grained configuration where explicitly specify the individual memory components. Flink will automatically add them up to derive the total memory.
    1. In addition to `.task.off-heap.size` and `.network.[min|max]`, you will also need to set `.task.heap.size` and `managed.size`.
    2. If you don't know how many heap/managed memory to configure, you can look for the configuration options in the beginning of the TM logs (`-Dkey=value`). Those are the values derived from your current configuration.

Thank you~

Xintong Song



On Sat, Jun 27, 2020 at 10:56 PM Dimitris Vogiatzidakis <[hidden email]> wrote:
Hello,

I'm having a bit of trouble understanding the memory configuration on flink.
I'm using flink10.0.0 to read some datasets of edges and extract features. I run this on a cluster consisting of 4 nodes , with 32cores and 252GB Ram each, and hopefully I could expand this as long as I can add extra nodes to the cluster.

So regarding the configuration file (flink-conf.yaml).
a) I can't understand when should I use process.size and when .flink.size.

b) From the detailed memory model I understand that Direct memory is included in both of flink and process size, however if I don't specify off-heap.task.size I get
" OutOfMemoryError: Direct buffer memory " .  Also should I change off-heap.fraction as well?

c)When I fix this, I get network buffers error, which if I understand correctly,  flink.size * network fraction , should be between min and max.

I can't find the 'perfect' configuration regarding my setup. What is the optimal way to use the system I have currently?

Thank you for your time.


Reply | Threaded
Open this post in threaded view
|

Re: Optimal Flink configuration for Standalone cluster.

Dimitris Vogiatzidakis
It could really be specific to your workload. Some workload may need more heap memory while others may need more off-heap. 
The main 'process' of my project creates a cross product of datasets and then applies a function to all of them to extract some features.
 
Alternatively, you can try to launch multiple TMs on one physical machine, to reduce the memory size of each TM process.
If I understand correctly you mean instead of 1 TM with 32 slots, I should have 4 TMs with 8? Or else i would exceed the amount of total cores and probably have tasks 'waiting' on other tasks to be completed.


BTW, what kind of workload are you running? Is it streaming or batch?
It is Batch. I have dataset of edges and try to extract features , to later be used for link prediction.

Thank you
-Dimitris Vogiatzidakis

>

On Mon, Jun 29, 2020 at 5:07 AM Xintong Song <[hidden email]> wrote:
Since changing off-heap removes memory from '.task.heap.size' is there a ratio that I should follow for better performance?
I don't think so. It could really be specific to your workload. Some workload may need more heap memory while others may need more off-heap.

Also, my guess (since I am dealing with big datasets) is that the more '.flink.size' I provide the better. Is that correct?
In most cases, yes. But it is also possible the other way around. Larger `.flink.size` usually also means larger JVM heap space, which reduces the frequency of GCs but increases the time cost on each GC (espeacially full GCs). On the other hand, if the memory is large enough, it could become the CPU resource rather than the memory that limits the performance. In such cases, increasing memory size won't give you more performance improvement but might introduce more GC overheads, thus harm the overall performance.

In this particular cluster, since every Machine has 252 total DRAM and worst case scenario 180GB is free to use, should I just say .flink.size: 180g?
Not sure about this. I would suggest to avoid large task managers (say tens of GBs) unless absolutely necessary. Alternatively, you can try to launch multiple TMs on one physical machine, to reduce the memory size of each TM process.

BTW, what kind of workload are you running? Is it streaming or batch?
 

Thank you~

Xintong Song



On Mon, Jun 29, 2020 at 1:18 AM Dimitris Vogiatzidakis <[hidden email]> wrote:
Hi Xintong,
Thank you for the quick response.
doing 1), without increasing  'task.off-heap.size'  does not change the issue, but increasing the off-heap alone does.
What should the off-heap value size be? Since changing off-heap removes memory from '.task.heap.size' is there a ratio that I should follow for better performance?
Also, my guess (since I am dealing with big datasets) is that the more '.flink.size' I provide the better. Is that correct? Or will it add extra 'overhead' that could slow down my computations? In this particular cluster, since every Machine has 252 total DRAM and worst case scenario 180GB is free to use, should I just say .flink.size: 180g?

Thank you very much and sorry if i'm asking silly questions.
Dimitris Vogiatzidakis

On Sun, Jun 28, 2020 at 5:25 AM Xintong Song <[hidden email]> wrote:
Hi Dimitris,

Regarding your questions.
a) For standalone clusters, the recommended way is to use `.flink.size` rather than `.process.size`. `.process.size` includes JVM metaspace and overhead in addition to `.flink.size`, which usually do not really matter for standalone clusters.
b) In case of direct OOMs, you should increase `taskmanager.memory.task.off-heap.size`. There's no fraction for that.
c) Your understanding is correct. And you can also specify the absolute network memory size by setting the min and max to the same value.

Here are my suggestions according to what you described.
  1. Since both off-heap and network memory seems insufficient, I would suggest to increase `taskmanager.memory.flink.size` to give your task managers more memory in total.
  2. If 1) does not work, I would suggest not to set the total memory (means configure neither `.flink.size` nor `process.size`), but go for the fine grained configuration where explicitly specify the individual memory components. Flink will automatically add them up to derive the total memory.
    1. In addition to `.task.off-heap.size` and `.network.[min|max]`, you will also need to set `.task.heap.size` and `managed.size`.
    2. If you don't know how many heap/managed memory to configure, you can look for the configuration options in the beginning of the TM logs (`-Dkey=value`). Those are the values derived from your current configuration.

Thank you~

Xintong Song



On Sat, Jun 27, 2020 at 10:56 PM Dimitris Vogiatzidakis <[hidden email]> wrote:
Hello,

I'm having a bit of trouble understanding the memory configuration on flink.
I'm using flink10.0.0 to read some datasets of edges and extract features. I run this on a cluster consisting of 4 nodes , with 32cores and 252GB Ram each, and hopefully I could expand this as long as I can add extra nodes to the cluster.

So regarding the configuration file (flink-conf.yaml).
a) I can't understand when should I use process.size and when .flink.size.

b) From the detailed memory model I understand that Direct memory is included in both of flink and process size, however if I don't specify off-heap.task.size I get
" OutOfMemoryError: Direct buffer memory " .  Also should I change off-heap.fraction as well?

c)When I fix this, I get network buffers error, which if I understand correctly,  flink.size * network fraction , should be between min and max.

I can't find the 'perfect' configuration regarding my setup. What is the optimal way to use the system I have currently?

Thank you for your time.