Flink memory usage

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink memory usage

Newport, Billy

How does Flink use memory? We’re seeing cases when running a job on larger datasets where it throws OOM exceptions during the job. We’re using the Dataset API. Shouldn’t flink be streaming from disk to disk? We workaround by using fewer slots but it seems unintuitive that I need to change these settings given Flink != Spark. Why isn’t Flinks memory usage constant? Why couldn’t I run a job with a single task and a single slot for any size job successfully other than it takes much longer to run.

 

Thanks

Billy

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Flink memory usage

Fabian Hueske-2
Hi Billy,

Flink's internal operators are implemented to not allocate heap space proportional to the size of the input data.
Whenever Flink needs to hold data in memory (e.g., for sorting or building a hash table) the data is serialized into managed memory. If all memory is in use, Flink starts spilling to disk. This blog post discusses how Flink uses its managed memory [1] (still up to date, even though it's almost 2 years old).
The runtime code should actually quite stable. Most of the code has been there for several years (even before Flink was donated to the ASF) and we haven't seen many bugs reported for the DataSet runtime. Of course this does not mean that the code doesn't contain bugs.

However, Flink does not take care of the user code. For example a GroupReduceFunction that collects a lot of data, e.g., in a List on the heap, can still kill a program.

I would check if you have user functions that require lots of heap memory.
Also reducing the size of the managed memory to have more heap space available might help.
If that doesn't solve the problem, it would be good if you could share some details about your job (which operators, which local strategies, how many operators) that might help to identify the misbehaving operator.

Thanks, Fabian

2017-04-19 16:09 GMT+02:00 Newport, Billy <[hidden email]>:

How does Flink use memory? We’re seeing cases when running a job on larger datasets where it throws OOM exceptions during the job. We’re using the Dataset API. Shouldn’t flink be streaming from disk to disk? We workaround by using fewer slots but it seems unintuitive that I need to change these settings given Flink != Spark. Why isn’t Flinks memory usage constant? Why couldn’t I run a job with a single task and a single slot for any size job successfully other than it takes much longer to run.

 

Thanks

Billy

 

 


Reply | Threaded
Open this post in threaded view
|

RE: Flink memory usage

Newport, Billy

I don’t think our function are memory heavy they typically are cogroups and merge the records on the left with the records on the right.

 

We’re currently requiring 720GB of heap to do our processing which frankly appears ridiculous to us. Could too much parallelism be causing the problem? Looking at:

 

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Optimal-Configuration-for-Cluster-td5024.html

 

If we are processing 17 “datasets” in a single job and each has an individual parallelism of 40 is that a total parallelism (potential) of 17*40 and given your network buffers calculation of parallelism squared, would that do it or only if we explicitly configure it that way:

 

taskmanager.network.numberOfBuffers: p ^ 2 * t * 4 


where p is the maximum parallelism of the job and t is the number of task manager.

You can process more than one parallel task per TM if you configure more than one processing slot per machine ( taskmanager.numberOfTaskSlots). The TM will divide its memory among all its slots. So it would be possible to start one TM for each machine with 100GB+ memory and 48 slots each.

 

Our pipeline for each dataset looks like this:

 

Read avro file -> FlatMap -> Validate each record with a flatmap ->

Read Parquet -> FlatMap -> Filter Live Rows -> CoGroup with the validated avro file above -> }

Read Parquet -> FlatMap -> Filter Dead Rows  ----------------------------------à  } Union cogroup with dead rows and write result to parquet file.

 

I don’t understand why this logic couldn’t run with a single task manager and just take longer. We’re having a lot of trouble trying to change the tuning to reduce the memory burn. We run the above pipeline with parallelism 40 for all 17 datasets in a single job.

 

We’re running this config now which is not really justifiable for what we’re doing.

 

20 nodes 2 slots, 40 parallelism 36GB mem = 720GB of heap…

 

Thanks

 

From: Fabian Hueske [mailto:[hidden email]]
Sent: Wednesday, April 19, 2017 10:52 AM
To: Newport, Billy [Tech]
Cc: [hidden email]
Subject: Re: Flink memory usage

 

Hi Billy,

Flink's internal operators are implemented to not allocate heap space proportional to the size of the input data.

Whenever Flink needs to hold data in memory (e.g., for sorting or building a hash table) the data is serialized into managed memory. If all memory is in use, Flink starts spilling to disk. This blog post discusses how Flink uses its managed memory [1] (still up to date, even though it's almost 2 years old).

The runtime code should actually quite stable. Most of the code has been there for several years (even before Flink was donated to the ASF) and we haven't seen many bugs reported for the DataSet runtime. Of course this does not mean that the code doesn't contain bugs.

 

However, Flink does not take care of the user code. For example a GroupReduceFunction that collects a lot of data, e.g., in a List on the heap, can still kill a program.

I would check if you have user functions that require lots of heap memory.

Also reducing the size of the managed memory to have more heap space available might help.

If that doesn't solve the problem, it would be good if you could share some details about your job (which operators, which local strategies, how many operators) that might help to identify the misbehaving operator.

 

Thanks, Fabian

 

2017-04-19 16:09 GMT+02:00 Newport, Billy <[hidden email]>:

How does Flink use memory? We’re seeing cases when running a job on larger datasets where it throws OOM exceptions during the job. We’re using the Dataset API. Shouldn’t flink be streaming from disk to disk? We workaround by using fewer slots but it seems unintuitive that I need to change these settings given Flink != Spark. Why isn’t Flinks memory usage constant? Why couldn’t I run a job with a single task and a single slot for any size job successfully other than it takes much longer to run.

 

Thanks

Billy

 

 

 

Reply | Threaded
Open this post in threaded view
|

RE: Flink memory usage

Stefano Bortoli-3

Hi Billy,

 

The only suggestion I can give is to check very well in your code for useless variable allocations, and foster reuse as much as possible. Don’t create a new collection at any map execution, but rather clear, reuse the collected output of the flatMap, and so on.  In the past we run long process of lot of data and small memory without problems. Many more complex co-group, joins and so on without any issue.

 

My2c. Hope it helps.

 

Stefano

 

From: Newport, Billy [mailto:[hidden email]]
Sent: Thursday, April 20, 2017 1:31 PM
To: 'Fabian Hueske' <[hidden email]>
Cc: '[hidden email]' <[hidden email]>
Subject: RE: Flink memory usage

 

I don’t think our function are memory heavy they typically are cogroups and merge the records on the left with the records on the right.

 

We’re currently requiring 720GB of heap to do our processing which frankly appears ridiculous to us. Could too much parallelism be causing the problem? Looking at:

 

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Optimal-Configuration-for-Cluster-td5024.html

 

If we are processing 17 “datasets” in a single job and each has an individual parallelism of 40 is that a total parallelism (potential) of 17*40 and given your network buffers calculation of parallelism squared, would that do it or only if we explicitly configure it that way:

 

taskmanager.network.numberOfBuffers: p ^ 2 * t * 4 


where p is the maximum parallelism of the job and t is the number of task manager.

You can process more than one parallel task per TM if you configure more than one processing slot per machine ( taskmanager.numberOfTaskSlots). The TM will divide its memory among all its slots. So it would be possible to start one TM for each machine with 100GB+ memory and 48 slots each.

 

Our pipeline for each dataset looks like this:

 

Read avro file -> FlatMap -> Validate each record with a flatmap ->

Read Parquet -> FlatMap -> Filter Live Rows -> CoGroup with the validated avro file above -> }

Read Parquet -> FlatMap -> Filter Dead Rows  ----------------------------------à  } Union cogroup with dead rows and write result to parquet file.

 

I don’t understand why this logic couldn’t run with a single task manager and just take longer. We’re having a lot of trouble trying to change the tuning to reduce the memory burn. We run the above pipeline with parallelism 40 for all 17 datasets in a single job.

 

We’re running this config now which is not really justifiable for what we’re doing.

 

20 nodes 2 slots, 40 parallelism 36GB mem = 720GB of heap…

 

Thanks

 

From: Fabian Hueske [[hidden email]]
Sent: Wednesday, April 19, 2017 10:52 AM
To: Newport, Billy [Tech]
Cc: [hidden email]
Subject: Re: Flink memory usage

 

Hi Billy,

Flink's internal operators are implemented to not allocate heap space proportional to the size of the input data.

Whenever Flink needs to hold data in memory (e.g., for sorting or building a hash table) the data is serialized into managed memory. If all memory is in use, Flink starts spilling to disk. This blog post discusses how Flink uses its managed memory [1] (still up to date, even though it's almost 2 years old).

The runtime code should actually quite stable. Most of the code has been there for several years (even before Flink was donated to the ASF) and we haven't seen many bugs reported for the DataSet runtime. Of course this does not mean that the code doesn't contain bugs.

 

However, Flink does not take care of the user code. For example a GroupReduceFunction that collects a lot of data, e.g., in a List on the heap, can still kill a program.

I would check if you have user functions that require lots of heap memory.

Also reducing the size of the managed memory to have more heap space available might help.

If that doesn't solve the problem, it would be good if you could share some details about your job (which operators, which local strategies, how many operators) that might help to identify the misbehaving operator.

 

Thanks, Fabian

 

2017-04-19 16:09 GMT+02:00 Newport, Billy <[hidden email]>:

How does Flink use memory? We’re seeing cases when running a job on larger datasets where it throws OOM exceptions during the job. We’re using the Dataset API. Shouldn’t flink be streaming from disk to disk? We workaround by using fewer slots but it seems unintuitive that I need to change these settings given Flink != Spark. Why isn’t Flinks memory usage constant? Why couldn’t I run a job with a single task and a single slot for any size job successfully other than it takes much longer to run.

 

Thanks

Billy

 

 

 

Reply | Threaded
Open this post in threaded view
|

RE: Flink memory usage

Newport, Billy

Your reuse idea kind of implies that it’s a GC generation rate issue, i.e. it’s not collecting fast enough so it’s running out of memory versus heap that’s actually anchored, right?

 

 

From: Stefano Bortoli [mailto:[hidden email]]
Sent: Thursday, April 20, 2017 10:33 AM
To: Newport, Billy [Tech]; 'Fabian Hueske'
Cc: '[hidden email]'
Subject: RE: Flink memory usage

 

Hi Billy,

 

The only suggestion I can give is to check very well in your code for useless variable allocations, and foster reuse as much as possible. Don’t create a new collection at any map execution, but rather clear, reuse the collected output of the flatMap, and so on.  In the past we run long process of lot of data and small memory without problems. Many more complex co-group, joins and so on without any issue.

 

My2c. Hope it helps.

 

Stefano

 

From: Newport, Billy [[hidden email]]
Sent: Thursday, April 20, 2017 1:31 PM
To: 'Fabian Hueske' <[hidden email]>
Cc: '[hidden email]' <[hidden email]>
Subject: RE: Flink memory usage

 

I don’t think our function are memory heavy they typically are cogroups and merge the records on the left with the records on the right.

 

We’re currently requiring 720GB of heap to do our processing which frankly appears ridiculous to us. Could too much parallelism be causing the problem? Looking at:

 

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Optimal-Configuration-for-Cluster-td5024.html

 

If we are processing 17 “datasets” in a single job and each has an individual parallelism of 40 is that a total parallelism (potential) of 17*40 and given your network buffers calculation of parallelism squared, would that do it or only if we explicitly configure it that way:

 

taskmanager.network.numberOfBuffers: p ^ 2 * t * 4 


where p is the maximum parallelism of the job and t is the number of task manager.

You can process more than one parallel task per TM if you configure more than one processing slot per machine ( taskmanager.numberOfTaskSlots). The TM will divide its memory among all its slots. So it would be possible to start one TM for each machine with 100GB+ memory and 48 slots each.

 

Our pipeline for each dataset looks like this:

 

Read avro file -> FlatMap -> Validate each record with a flatmap ->

Read Parquet -> FlatMap -> Filter Live Rows -> CoGroup with the validated avro file above -> }

Read Parquet -> FlatMap -> Filter Dead Rows  ----------------------------------à  } Union cogroup with dead rows and write result to parquet file.

 

I don’t understand why this logic couldn’t run with a single task manager and just take longer. We’re having a lot of trouble trying to change the tuning to reduce the memory burn. We run the above pipeline with parallelism 40 for all 17 datasets in a single job.

 

We’re running this config now which is not really justifiable for what we’re doing.

 

20 nodes 2 slots, 40 parallelism 36GB mem = 720GB of heap…

 

Thanks

 

From: Fabian Hueske [[hidden email]]
Sent: Wednesday, April 19, 2017 10:52 AM
To: Newport, Billy [Tech]
Cc: [hidden email]
Subject: Re: Flink memory usage

 

Hi Billy,

Flink's internal operators are implemented to not allocate heap space proportional to the size of the input data.

Whenever Flink needs to hold data in memory (e.g., for sorting or building a hash table) the data is serialized into managed memory. If all memory is in use, Flink starts spilling to disk. This blog post discusses how Flink uses its managed memory [1] (still up to date, even though it's almost 2 years old).

The runtime code should actually quite stable. Most of the code has been there for several years (even before Flink was donated to the ASF) and we haven't seen many bugs reported for the DataSet runtime. Of course this does not mean that the code doesn't contain bugs.

 

However, Flink does not take care of the user code. For example a GroupReduceFunction that collects a lot of data, e.g., in a List on the heap, can still kill a program.

I would check if you have user functions that require lots of heap memory.

Also reducing the size of the managed memory to have more heap space available might help.

If that doesn't solve the problem, it would be good if you could share some details about your job (which operators, which local strategies, how many operators) that might help to identify the misbehaving operator.

 

Thanks, Fabian

 

2017-04-19 16:09 GMT+02:00 Newport, Billy <[hidden email]>:

How does Flink use memory? We’re seeing cases when running a job on larger datasets where it throws OOM exceptions during the job. We’re using the Dataset API. Shouldn’t flink be streaming from disk to disk? We workaround by using fewer slots but it seems unintuitive that I need to change these settings given Flink != Spark. Why isn’t Flinks memory usage constant? Why couldn’t I run a job with a single task and a single slot for any size job successfully other than it takes much longer to run.

 

Thanks

Billy

 

 

 

Reply | Threaded
Open this post in threaded view
|

RE: Flink memory usage

Stefano Bortoli-3

I think that if you have a lot of memory available, the GC gets kind of lazy. In our case, the issue was just the latency caused by the GC, cause we were loading more data than it could fit in memory. Hence optimizing the code gave us a lot of improvements. FlatMaps are also dangerous as objects can multiply beyond expected, making co-group extremely costly. :-) A distinct() well placed saves a lot of time and memory.

 

My point is that having worked with scarce resources I learned that almost all the time the issue was my code, not the framework.

 

Good luck.

 

Stefano

 

From: Newport, Billy [mailto:[hidden email]]
Sent: Thursday, April 20, 2017 4:46 PM
To: Stefano Bortoli <[hidden email]>; 'Fabian Hueske' <[hidden email]>
Cc: '[hidden email]' <[hidden email]>
Subject: RE: Flink memory usage

 

Your reuse idea kind of implies that it’s a GC generation rate issue, i.e. it’s not collecting fast enough so it’s running out of memory versus heap that’s actually anchored, right?

 

 

From: Stefano Bortoli [[hidden email]]
Sent: Thursday, April 20, 2017 10:33 AM
To: Newport, Billy [Tech]; 'Fabian Hueske'
Cc: '[hidden email]'
Subject: RE: Flink memory usage

 

Hi Billy,

 

The only suggestion I can give is to check very well in your code for useless variable allocations, and foster reuse as much as possible. Don’t create a new collection at any map execution, but rather clear, reuse the collected output of the flatMap, and so on.  In the past we run long process of lot of data and small memory without problems. Many more complex co-group, joins and so on without any issue.

 

My2c. Hope it helps.

 

Stefano

 

From: Newport, Billy [[hidden email]]
Sent: Thursday, April 20, 2017 1:31 PM
To: 'Fabian Hueske' <[hidden email]>
Cc: '[hidden email]' <[hidden email]>
Subject: RE: Flink memory usage

 

I don’t think our function are memory heavy they typically are cogroups and merge the records on the left with the records on the right.

 

We’re currently requiring 720GB of heap to do our processing which frankly appears ridiculous to us. Could too much parallelism be causing the problem? Looking at:

 

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Optimal-Configuration-for-Cluster-td5024.html

 

If we are processing 17 “datasets” in a single job and each has an individual parallelism of 40 is that a total parallelism (potential) of 17*40 and given your network buffers calculation of parallelism squared, would that do it or only if we explicitly configure it that way:

 

taskmanager.network.numberOfBuffers: p ^ 2 * t * 4 


where p is the maximum parallelism of the job and t is the number of task manager.

You can process more than one parallel task per TM if you configure more than one processing slot per machine ( taskmanager.numberOfTaskSlots). The TM will divide its memory among all its slots. So it would be possible to start one TM for each machine with 100GB+ memory and 48 slots each.

 

Our pipeline for each dataset looks like this:

 

Read avro file -> FlatMap -> Validate each record with a flatmap ->

Read Parquet -> FlatMap -> Filter Live Rows -> CoGroup with the validated avro file above -> }

Read Parquet -> FlatMap -> Filter Dead Rows  ----------------------------------à  } Union cogroup with dead rows and write result to parquet file.

 

I don’t understand why this logic couldn’t run with a single task manager and just take longer. We’re having a lot of trouble trying to change the tuning to reduce the memory burn. We run the above pipeline with parallelism 40 for all 17 datasets in a single job.

 

We’re running this config now which is not really justifiable for what we’re doing.

 

20 nodes 2 slots, 40 parallelism 36GB mem = 720GB of heap…

 

Thanks

 

From: Fabian Hueske [[hidden email]]
Sent: Wednesday, April 19, 2017 10:52 AM
To: Newport, Billy [Tech]
Cc: [hidden email]
Subject: Re: Flink memory usage

 

Hi Billy,

Flink's internal operators are implemented to not allocate heap space proportional to the size of the input data.

Whenever Flink needs to hold data in memory (e.g., for sorting or building a hash table) the data is serialized into managed memory. If all memory is in use, Flink starts spilling to disk. This blog post discusses how Flink uses its managed memory [1] (still up to date, even though it's almost 2 years old).

The runtime code should actually quite stable. Most of the code has been there for several years (even before Flink was donated to the ASF) and we haven't seen many bugs reported for the DataSet runtime. Of course this does not mean that the code doesn't contain bugs.

 

However, Flink does not take care of the user code. For example a GroupReduceFunction that collects a lot of data, e.g., in a List on the heap, can still kill a program.

I would check if you have user functions that require lots of heap memory.

Also reducing the size of the managed memory to have more heap space available might help.

If that doesn't solve the problem, it would be good if you could share some details about your job (which operators, which local strategies, how many operators) that might help to identify the misbehaving operator.

 

Thanks, Fabian

 

2017-04-19 16:09 GMT+02:00 Newport, Billy <[hidden email]>:

How does Flink use memory? We’re seeing cases when running a job on larger datasets where it throws OOM exceptions during the job. We’re using the Dataset API. Shouldn’t flink be streaming from disk to disk? We workaround by using fewer slots but it seems unintuitive that I need to change these settings given Flink != Spark. Why isn’t Flinks memory usage constant? Why couldn’t I run a job with a single task and a single slot for any size job successfully other than it takes much longer to run.

 

Thanks

Billy

 

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Flink memory usage

Till Rohrmann
Hi Billy,

if you didn't split the different data sets up into different slot sharing groups, then your maximum parallelism is 40. Thus, it should be enough to assign 40^2 * 20 * 4 = 128000 network buffers. If that is not enough because you have more than 4 shuffling steps in parallel running then you have to increase the last term.

OOM exceptions should actually only occur due to user code objects. Given that you have reserved a massive amount of memory for the network buffers the remaining heap for the user code is probably very small. Try whether you can decrease the number of network buffers. Moreover, check whether your user code keeps somewhere references to objects which could cause the OOM.

Cheers,
Till 

On Thu, Apr 20, 2017 at 5:42 PM, Stefano Bortoli <[hidden email]> wrote:

I think that if you have a lot of memory available, the GC gets kind of lazy. In our case, the issue was just the latency caused by the GC, cause we were loading more data than it could fit in memory. Hence optimizing the code gave us a lot of improvements. FlatMaps are also dangerous as objects can multiply beyond expected, making co-group extremely costly. :-) A distinct() well placed saves a lot of time and memory.

 

My point is that having worked with scarce resources I learned that almost all the time the issue was my code, not the framework.

 

Good luck.

 

Stefano

 

From: Newport, Billy [mailto:[hidden email]]
Sent: Thursday, April 20, 2017 4:46 PM
To: Stefano Bortoli <[hidden email]>; 'Fabian Hueske' <[hidden email]>


Cc: '[hidden email]' <[hidden email]>
Subject: RE: Flink memory usage

 

Your reuse idea kind of implies that it’s a GC generation rate issue, i.e. it’s not collecting fast enough so it’s running out of memory versus heap that’s actually anchored, right?

 

 

From: Stefano Bortoli [[hidden email]]
Sent: Thursday, April 20, 2017 10:33 AM
To: Newport, Billy [Tech]; 'Fabian Hueske'
Cc: '[hidden email]'
Subject: RE: Flink memory usage

 

Hi Billy,

 

The only suggestion I can give is to check very well in your code for useless variable allocations, and foster reuse as much as possible. Don’t create a new collection at any map execution, but rather clear, reuse the collected output of the flatMap, and so on.  In the past we run long process of lot of data and small memory without problems. Many more complex co-group, joins and so on without any issue.

 

My2c. Hope it helps.

 

Stefano

 

From: Newport, Billy [[hidden email]]
Sent: Thursday, April 20, 2017 1:31 PM
To: 'Fabian Hueske' <[hidden email]>
Cc: '[hidden email]' <[hidden email]>
Subject: RE: Flink memory usage

 

I don’t think our function are memory heavy they typically are cogroups and merge the records on the left with the records on the right.

 

We’re currently requiring 720GB of heap to do our processing which frankly appears ridiculous to us. Could too much parallelism be causing the problem? Looking at:

 

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Optimal-Configuration-for-Cluster-td5024.html

 

If we are processing 17 “datasets” in a single job and each has an individual parallelism of 40 is that a total parallelism (potential) of 17*40 and given your network buffers calculation of parallelism squared, would that do it or only if we explicitly configure it that way:

 

taskmanager.network.numberOfBuffers: p ^ 2 * t * 4 


where p is the maximum parallelism of the job and t is the number of task manager.

You can process more than one parallel task per TM if you configure more than one processing slot per machine ( taskmanager.numberOfTaskSlots). The TM will divide its memory among all its slots. So it would be possible to start one TM for each machine with 100GB+ memory and 48 slots each.

 

Our pipeline for each dataset looks like this:

 

Read avro file -> FlatMap -> Validate each record with a flatmap ->

Read Parquet -> FlatMap -> Filter Live Rows -> CoGroup with the validated avro file above -> }

Read Parquet -> FlatMap -> Filter Dead Rows  ----------------------------------à  } Union cogroup with dead rows and write result to parquet file.

 

I don’t understand why this logic couldn’t run with a single task manager and just take longer. We’re having a lot of trouble trying to change the tuning to reduce the memory burn. We run the above pipeline with parallelism 40 for all 17 datasets in a single job.

 

We’re running this config now which is not really justifiable for what we’re doing.

 

20 nodes 2 slots, 40 parallelism 36GB mem = 720GB of heap…

 

Thanks

 

From: Fabian Hueske [[hidden email]]
Sent: Wednesday, April 19, 2017 10:52 AM
To: Newport, Billy [Tech]
Cc: [hidden email]
Subject: Re: Flink memory usage

 

Hi Billy,

Flink's internal operators are implemented to not allocate heap space proportional to the size of the input data.

Whenever Flink needs to hold data in memory (e.g., for sorting or building a hash table) the data is serialized into managed memory. If all memory is in use, Flink starts spilling to disk. This blog post discusses how Flink uses its managed memory [1] (still up to date, even though it's almost 2 years old).

The runtime code should actually quite stable. Most of the code has been there for several years (even before Flink was donated to the ASF) and we haven't seen many bugs reported for the DataSet runtime. Of course this does not mean that the code doesn't contain bugs.

 

However, Flink does not take care of the user code. For example a GroupReduceFunction that collects a lot of data, e.g., in a List on the heap, can still kill a program.

I would check if you have user functions that require lots of heap memory.

Also reducing the size of the managed memory to have more heap space available might help.

If that doesn't solve the problem, it would be good if you could share some details about your job (which operators, which local strategies, how many operators) that might help to identify the misbehaving operator.

 

Thanks, Fabian

 

2017-04-19 16:09 GMT+02:00 Newport, Billy <[hidden email]>:

How does Flink use memory? We’re seeing cases when running a job on larger datasets where it throws OOM exceptions during the job. We’re using the Dataset API. Shouldn’t flink be streaming from disk to disk? We workaround by using fewer slots but it seems unintuitive that I need to change these settings given Flink != Spark. Why isn’t Flinks memory usage constant? Why couldn’t I run a job with a single task and a single slot for any size job successfully other than it takes much longer to run.

 

Thanks

Billy

 

 

 


Reply | Threaded
Open this post in threaded view
|

RE: Flink memory usage

Newport, Billy

Ok

The concensus seems to be that it’s us not Flink J So we’ll look harder at what we’re doing in case there is anything silly. We are using 16K network buffers BTW which is around 0.5GB with the defaults.

 

 

From: Till Rohrmann [mailto:[hidden email]]
Sent: Thursday, April 20, 2017 11:52 AM
To: Stefano Bortoli
Cc: Newport, Billy [Tech]; Fabian Hueske; [hidden email]
Subject: Re: Flink memory usage

 

Hi Billy,

 

if you didn't split the different data sets up into different slot sharing groups, then your maximum parallelism is 40. Thus, it should be enough to assign 40^2 * 20 * 4 = 128000 network buffers. If that is not enough because you have more than 4 shuffling steps in parallel running then you have to increase the last term.

 

OOM exceptions should actually only occur due to user code objects. Given that you have reserved a massive amount of memory for the network buffers the remaining heap for the user code is probably very small. Try whether you can decrease the number of network buffers. Moreover, check whether your user code keeps somewhere references to objects which could cause the OOM.

 

Cheers,

Till 

 

On Thu, Apr 20, 2017 at 5:42 PM, Stefano Bortoli <[hidden email]> wrote:

I think that if you have a lot of memory available, the GC gets kind of lazy. In our case, the issue was just the latency caused by the GC, cause we were loading more data than it could fit in memory. Hence optimizing the code gave us a lot of improvements. FlatMaps are also dangerous as objects can multiply beyond expected, making co-group extremely costly. :-) A distinct() well placed saves a lot of time and memory.

 

My point is that having worked with scarce resources I learned that almost all the time the issue was my code, not the framework.

 

Good luck.

 

Stefano

 

From: Newport, Billy [mailto:[hidden email]]
Sent: Thursday, April 20, 2017 4:46 PM
To: Stefano Bortoli <[hidden email]>; 'Fabian Hueske' <[hidden email]>


Cc: '[hidden email]' <[hidden email]>
Subject: RE: Flink memory usage

 

Your reuse idea kind of implies that it’s a GC generation rate issue, i.e. it’s not collecting fast enough so it’s running out of memory versus heap that’s actually anchored, right?

 

 

From: Stefano Bortoli [[hidden email]]
Sent: Thursday, April 20, 2017 10:33 AM
To: Newport, Billy [Tech]; 'Fabian Hueske'
Cc: '[hidden email]'
Subject: RE: Flink memory usage

 

Hi Billy,

 

The only suggestion I can give is to check very well in your code for useless variable allocations, and foster reuse as much as possible. Don’t create a new collection at any map execution, but rather clear, reuse the collected output of the flatMap, and so on.  In the past we run long process of lot of data and small memory without problems. Many more complex co-group, joins and so on without any issue.

 

My2c. Hope it helps.

 

Stefano

 

From: Newport, Billy [[hidden email]]
Sent: Thursday, April 20, 2017 1:31 PM
To: 'Fabian Hueske' <[hidden email]>
Cc: '[hidden email]' <[hidden email]>
Subject: RE: Flink memory usage

 

I don’t think our function are memory heavy they typically are cogroups and merge the records on the left with the records on the right.

 

We’re currently requiring 720GB of heap to do our processing which frankly appears ridiculous to us. Could too much parallelism be causing the problem? Looking at:

 

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Optimal-Configuration-for-Cluster-td5024.html

 

If we are processing 17 “datasets” in a single job and each has an individual parallelism of 40 is that a total parallelism (potential) of 17*40 and given your network buffers calculation of parallelism squared, would that do it or only if we explicitly configure it that way:

 

taskmanager.network.numberOfBuffers: p ^ 2 * t * 4 


where p is the maximum parallelism of the job and t is the number of task manager.

You can process more than one parallel task per TM if you configure more than one processing slot per machine ( taskmanager.numberOfTaskSlots). The TM will divide its memory among all its slots. So it would be possible to start one TM for each machine with 100GB+ memory and 48 slots each.

 

Our pipeline for each dataset looks like this:

 

Read avro file -> FlatMap -> Validate each record with a flatmap ->

Read Parquet -> FlatMap -> Filter Live Rows -> CoGroup with the validated avro file above -> }

Read Parquet -> FlatMap -> Filter Dead Rows  ----------------------------------à  } Union cogroup with dead rows and write result to parquet file.

 

I don’t understand why this logic couldn’t run with a single task manager and just take longer. We’re having a lot of trouble trying to change the tuning to reduce the memory burn. We run the above pipeline with parallelism 40 for all 17 datasets in a single job.

 

We’re running this config now which is not really justifiable for what we’re doing.

 

20 nodes 2 slots, 40 parallelism 36GB mem = 720GB of heap…

 

Thanks

 

From: Fabian Hueske [[hidden email]]
Sent: Wednesday, April 19, 2017 10:52 AM
To: Newport, Billy [Tech]
Cc: [hidden email]
Subject: Re: Flink memory usage

 

Hi Billy,

Flink's internal operators are implemented to not allocate heap space proportional to the size of the input data.

Whenever Flink needs to hold data in memory (e.g., for sorting or building a hash table) the data is serialized into managed memory. If all memory is in use, Flink starts spilling to disk. This blog post discusses how Flink uses its managed memory [1] (still up to date, even though it's almost 2 years old).

The runtime code should actually quite stable. Most of the code has been there for several years (even before Flink was donated to the ASF) and we haven't seen many bugs reported for the DataSet runtime. Of course this does not mean that the code doesn't contain bugs.

 

However, Flink does not take care of the user code. For example a GroupReduceFunction that collects a lot of data, e.g., in a List on the heap, can still kill a program.

I would check if you have user functions that require lots of heap memory.

Also reducing the size of the managed memory to have more heap space available might help.

If that doesn't solve the problem, it would be good if you could share some details about your job (which operators, which local strategies, how many operators) that might help to identify the misbehaving operator.

 

Thanks, Fabian

 

2017-04-19 16:09 GMT+02:00 Newport, Billy <[hidden email]>:

How does Flink use memory? We’re seeing cases when running a job on larger datasets where it throws OOM exceptions during the job. We’re using the Dataset API. Shouldn’t flink be streaming from disk to disk? We workaround by using fewer slots but it seems unintuitive that I need to change these settings given Flink != Spark. Why isn’t Flinks memory usage constant? Why couldn’t I run a job with a single task and a single slot for any size job successfully other than it takes much longer to run.

 

Thanks

Billy

 

 

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Flink memory usage

Gábor Gévay
Hello,

You could also try using a profiler that shows what objects are using
what amount of memory. E.g., JProfiler or Java Flight Recorder [1].

Best,
Gábor

[1] https://docs.oracle.com/javase/8/docs/technotes/guides/troubleshoot/memleaks001.html






On Thu, Apr 20, 2017 at 6:00 PM, Newport, Billy <[hidden email]> wrote:

> Ok
>
> The concensus seems to be that it’s us not Flink J So we’ll look harder at
> what we’re doing in case there is anything silly. We are using 16K network
> buffers BTW which is around 0.5GB with the defaults.
>
>
>
>
>
> From: Till Rohrmann [mailto:[hidden email]]
> Sent: Thursday, April 20, 2017 11:52 AM
> To: Stefano Bortoli
> Cc: Newport, Billy [Tech]; Fabian Hueske; [hidden email]
>
>
> Subject: Re: Flink memory usage
>
>
>
> Hi Billy,
>
>
>
> if you didn't split the different data sets up into different slot sharing
> groups, then your maximum parallelism is 40. Thus, it should be enough to
> assign 40^2 * 20 * 4 = 128000 network buffers. If that is not enough because
> you have more than 4 shuffling steps in parallel running then you have to
> increase the last term.
>
>
>
> OOM exceptions should actually only occur due to user code objects. Given
> that you have reserved a massive amount of memory for the network buffers
> the remaining heap for the user code is probably very small. Try whether you
> can decrease the number of network buffers. Moreover, check whether your
> user code keeps somewhere references to objects which could cause the OOM.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Thu, Apr 20, 2017 at 5:42 PM, Stefano Bortoli
> <[hidden email]> wrote:
>
> I think that if you have a lot of memory available, the GC gets kind of
> lazy. In our case, the issue was just the latency caused by the GC, cause we
> were loading more data than it could fit in memory. Hence optimizing the
> code gave us a lot of improvements. FlatMaps are also dangerous as objects
> can multiply beyond expected, making co-group extremely costly. :-) A
> distinct() well placed saves a lot of time and memory.
>
>
>
> My point is that having worked with scarce resources I learned that almost
> all the time the issue was my code, not the framework.
>
>
>
> Good luck.
>
>
>
> Stefano
>
>
>
> From: Newport, Billy [mailto:[hidden email]]
> Sent: Thursday, April 20, 2017 4:46 PM
> To: Stefano Bortoli <[hidden email]>; 'Fabian Hueske'
> <[hidden email]>
>
>
> Cc: '[hidden email]' <[hidden email]>
> Subject: RE: Flink memory usage
>
>
>
> Your reuse idea kind of implies that it’s a GC generation rate issue, i.e.
> it’s not collecting fast enough so it’s running out of memory versus heap
> that’s actually anchored, right?
>
>
>
>
>
> From: Stefano Bortoli [mailto:[hidden email]]
> Sent: Thursday, April 20, 2017 10:33 AM
> To: Newport, Billy [Tech]; 'Fabian Hueske'
> Cc: '[hidden email]'
> Subject: RE: Flink memory usage
>
>
>
> Hi Billy,
>
>
>
> The only suggestion I can give is to check very well in your code for
> useless variable allocations, and foster reuse as much as possible. Don’t
> create a new collection at any map execution, but rather clear, reuse the
> collected output of the flatMap, and so on.  In the past we run long process
> of lot of data and small memory without problems. Many more complex
> co-group, joins and so on without any issue.
>
>
>
> My2c. Hope it helps.
>
>
>
> Stefano
>
>
>
> From: Newport, Billy [mailto:[hidden email]]
> Sent: Thursday, April 20, 2017 1:31 PM
> To: 'Fabian Hueske' <[hidden email]>
> Cc: '[hidden email]' <[hidden email]>
> Subject: RE: Flink memory usage
>
>
>
> I don’t think our function are memory heavy they typically are cogroups and
> merge the records on the left with the records on the right.
>
>
>
> We’re currently requiring 720GB of heap to do our processing which frankly
> appears ridiculous to us. Could too much parallelism be causing the problem?
> Looking at:
>
>
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Optimal-Configuration-for-Cluster-td5024.html
>
>
>
> If we are processing 17 “datasets” in a single job and each has an
> individual parallelism of 40 is that a total parallelism (potential) of
> 17*40 and given your network buffers calculation of parallelism squared,
> would that do it or only if we explicitly configure it that way:
>
>
>
> taskmanager.network.numberOfBuffers: p ^ 2 * t * 4
>
>
> where p is the maximum parallelism of the job and t is the number of task
> manager.
>
> You can process more than one parallel task per TM if you configure more
> than one processing slot per machine ( taskmanager.numberOfTaskSlots). The
> TM will divide its memory among all its slots. So it would be possible to
> start one TM for each machine with 100GB+ memory and 48 slots each.
>
>
>
> Our pipeline for each dataset looks like this:
>
>
>
> Read avro file -> FlatMap -> Validate each record with a flatmap ->
>
> Read Parquet -> FlatMap -> Filter Live Rows -> CoGroup with the validated
> avro file above -> }
>
> Read Parquet -> FlatMap -> Filter Dead Rows
> ----------------------------------à  } Union cogroup with dead rows and
> write result to parquet file.
>
>
>
> I don’t understand why this logic couldn’t run with a single task manager
> and just take longer. We’re having a lot of trouble trying to change the
> tuning to reduce the memory burn. We run the above pipeline with parallelism
> 40 for all 17 datasets in a single job.
>
>
>
> We’re running this config now which is not really justifiable for what we’re
> doing.
>
>
>
> 20 nodes 2 slots, 40 parallelism 36GB mem = 720GB of heap…
>
>
>
> Thanks
>
>
>
> From: Fabian Hueske [mailto:[hidden email]]
> Sent: Wednesday, April 19, 2017 10:52 AM
> To: Newport, Billy [Tech]
> Cc: [hidden email]
> Subject: Re: Flink memory usage
>
>
>
> Hi Billy,
>
> Flink's internal operators are implemented to not allocate heap space
> proportional to the size of the input data.
>
> Whenever Flink needs to hold data in memory (e.g., for sorting or building a
> hash table) the data is serialized into managed memory. If all memory is in
> use, Flink starts spilling to disk. This blog post discusses how Flink uses
> its managed memory [1] (still up to date, even though it's almost 2 years
> old).
>
> The runtime code should actually quite stable. Most of the code has been
> there for several years (even before Flink was donated to the ASF) and we
> haven't seen many bugs reported for the DataSet runtime. Of course this does
> not mean that the code doesn't contain bugs.
>
>
>
> However, Flink does not take care of the user code. For example a
> GroupReduceFunction that collects a lot of data, e.g., in a List on the
> heap, can still kill a program.
>
> I would check if you have user functions that require lots of heap memory.
>
> Also reducing the size of the managed memory to have more heap space
> available might help.
>
> If that doesn't solve the problem, it would be good if you could share some
> details about your job (which operators, which local strategies, how many
> operators) that might help to identify the misbehaving operator.
>
>
>
> Thanks, Fabian
>
>
> [1]
> https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html
>
>
>
> 2017-04-19 16:09 GMT+02:00 Newport, Billy <[hidden email]>:
>
> How does Flink use memory? We’re seeing cases when running a job on larger
> datasets where it throws OOM exceptions during the job. We’re using the
> Dataset API. Shouldn’t flink be streaming from disk to disk? We workaround
> by using fewer slots but it seems unintuitive that I need to change these
> settings given Flink != Spark. Why isn’t Flinks memory usage constant? Why
> couldn’t I run a job with a single task and a single slot for any size job
> successfully other than it takes much longer to run.
>
>
>
> Thanks
>
> Billy
>
>
>
>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink memory usage

Till Rohrmann
In reply to this post by Newport, Billy
Hi Billy,

if it's possible that you can share some parts of your code privately with me, then I can try to figure out what's going wrong.

Cheers,
Till

On Thu, Apr 20, 2017 at 6:00 PM, Newport, Billy <[hidden email]> wrote:

Ok

The concensus seems to be that it’s us not Flink J So we’ll look harder at what we’re doing in case there is anything silly. We are using 16K network buffers BTW which is around 0.5GB with the defaults.

 

 

From: Till Rohrmann [mailto:[hidden email]]
Sent: Thursday, April 20, 2017 11:52 AM
To: Stefano Bortoli
Cc: Newport, Billy [Tech]; Fabian Hueske; [hidden email]


Subject: Re: Flink memory usage

 

Hi Billy,

 

if you didn't split the different data sets up into different slot sharing groups, then your maximum parallelism is 40. Thus, it should be enough to assign 40^2 * 20 * 4 = 128000 network buffers. If that is not enough because you have more than 4 shuffling steps in parallel running then you have to increase the last term.

 

OOM exceptions should actually only occur due to user code objects. Given that you have reserved a massive amount of memory for the network buffers the remaining heap for the user code is probably very small. Try whether you can decrease the number of network buffers. Moreover, check whether your user code keeps somewhere references to objects which could cause the OOM.

 

Cheers,

Till 

 

On Thu, Apr 20, 2017 at 5:42 PM, Stefano Bortoli <[hidden email]> wrote:

I think that if you have a lot of memory available, the GC gets kind of lazy. In our case, the issue was just the latency caused by the GC, cause we were loading more data than it could fit in memory. Hence optimizing the code gave us a lot of improvements. FlatMaps are also dangerous as objects can multiply beyond expected, making co-group extremely costly. :-) A distinct() well placed saves a lot of time and memory.

 

My point is that having worked with scarce resources I learned that almost all the time the issue was my code, not the framework.

 

Good luck.

 

Stefano

 

From: Newport, Billy [mailto:[hidden email]]
Sent: Thursday, April 20, 2017 4:46 PM
To: Stefano Bortoli <[hidden email]>; 'Fabian Hueske' <[hidden email]>


Cc: '[hidden email]' <[hidden email]>
Subject: RE: Flink memory usage

 

Your reuse idea kind of implies that it’s a GC generation rate issue, i.e. it’s not collecting fast enough so it’s running out of memory versus heap that’s actually anchored, right?

 

 

From: Stefano Bortoli [[hidden email]]
Sent: Thursday, April 20, 2017 10:33 AM
To: Newport, Billy [Tech]; 'Fabian Hueske'
Cc: '[hidden email]'
Subject: RE: Flink memory usage

 

Hi Billy,

 

The only suggestion I can give is to check very well in your code for useless variable allocations, and foster reuse as much as possible. Don’t create a new collection at any map execution, but rather clear, reuse the collected output of the flatMap, and so on.  In the past we run long process of lot of data and small memory without problems. Many more complex co-group, joins and so on without any issue.

 

My2c. Hope it helps.

 

Stefano

 

From: Newport, Billy [[hidden email]]
Sent: Thursday, April 20, 2017 1:31 PM
To: 'Fabian Hueske' <[hidden email]>
Cc: '[hidden email]' <[hidden email]>
Subject: RE: Flink memory usage

 

I don’t think our function are memory heavy they typically are cogroups and merge the records on the left with the records on the right.

 

We’re currently requiring 720GB of heap to do our processing which frankly appears ridiculous to us. Could too much parallelism be causing the problem? Looking at:

 

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Optimal-Configuration-for-Cluster-td5024.html

 

If we are processing 17 “datasets” in a single job and each has an individual parallelism of 40 is that a total parallelism (potential) of 17*40 and given your network buffers calculation of parallelism squared, would that do it or only if we explicitly configure it that way:

 

taskmanager.network.numberOfBuffers: p ^ 2 * t * 4 


where p is the maximum parallelism of the job and t is the number of task manager.

You can process more than one parallel task per TM if you configure more than one processing slot per machine ( taskmanager.numberOfTaskSlots). The TM will divide its memory among all its slots. So it would be possible to start one TM for each machine with 100GB+ memory and 48 slots each.

 

Our pipeline for each dataset looks like this:

 

Read avro file -> FlatMap -> Validate each record with a flatmap ->

Read Parquet -> FlatMap -> Filter Live Rows -> CoGroup with the validated avro file above -> }

Read Parquet -> FlatMap -> Filter Dead Rows  ----------------------------------à  } Union cogroup with dead rows and write result to parquet file.

 

I don’t understand why this logic couldn’t run with a single task manager and just take longer. We’re having a lot of trouble trying to change the tuning to reduce the memory burn. We run the above pipeline with parallelism 40 for all 17 datasets in a single job.

 

We’re running this config now which is not really justifiable for what we’re doing.

 

20 nodes 2 slots, 40 parallelism 36GB mem = 720GB of heap…

 

Thanks

 

From: Fabian Hueske [[hidden email]]
Sent: Wednesday, April 19, 2017 10:52 AM
To: Newport, Billy [Tech]
Cc: [hidden email]
Subject: Re: Flink memory usage

 

Hi Billy,

Flink's internal operators are implemented to not allocate heap space proportional to the size of the input data.

Whenever Flink needs to hold data in memory (e.g., for sorting or building a hash table) the data is serialized into managed memory. If all memory is in use, Flink starts spilling to disk. This blog post discusses how Flink uses its managed memory [1] (still up to date, even though it's almost 2 years old).

The runtime code should actually quite stable. Most of the code has been there for several years (even before Flink was donated to the ASF) and we haven't seen many bugs reported for the DataSet runtime. Of course this does not mean that the code doesn't contain bugs.

 

However, Flink does not take care of the user code. For example a GroupReduceFunction that collects a lot of data, e.g., in a List on the heap, can still kill a program.

I would check if you have user functions that require lots of heap memory.

Also reducing the size of the managed memory to have more heap space available might help.

If that doesn't solve the problem, it would be good if you could share some details about your job (which operators, which local strategies, how many operators) that might help to identify the misbehaving operator.

 

Thanks, Fabian

 

2017-04-19 16:09 GMT+02:00 Newport, Billy <[hidden email]>:

How does Flink use memory? We’re seeing cases when running a job on larger datasets where it throws OOM exceptions during the job. We’re using the Dataset API. Shouldn’t flink be streaming from disk to disk? We workaround by using fewer slots but it seems unintuitive that I need to change these settings given Flink != Spark. Why isn’t Flinks memory usage constant? Why couldn’t I run a job with a single task and a single slot for any size job successfully other than it takes much longer to run.

 

Thanks

Billy