Re: Flink memory usage

Posted by Gábor Gévay on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Flink-memory-usage-tp12678p12706.html

Hello,

You could also try using a profiler that shows what objects are using
what amount of memory. E.g., JProfiler or Java Flight Recorder [1].

Best,
Gábor

[1] https://docs.oracle.com/javase/8/docs/technotes/guides/troubleshoot/memleaks001.html






On Thu, Apr 20, 2017 at 6:00 PM, Newport, Billy <[hidden email]> wrote:

> Ok
>
> The concensus seems to be that it’s us not Flink J So we’ll look harder at
> what we’re doing in case there is anything silly. We are using 16K network
> buffers BTW which is around 0.5GB with the defaults.
>
>
>
>
>
> From: Till Rohrmann [mailto:[hidden email]]
> Sent: Thursday, April 20, 2017 11:52 AM
> To: Stefano Bortoli
> Cc: Newport, Billy [Tech]; Fabian Hueske; [hidden email]
>
>
> Subject: Re: Flink memory usage
>
>
>
> Hi Billy,
>
>
>
> if you didn't split the different data sets up into different slot sharing
> groups, then your maximum parallelism is 40. Thus, it should be enough to
> assign 40^2 * 20 * 4 = 128000 network buffers. If that is not enough because
> you have more than 4 shuffling steps in parallel running then you have to
> increase the last term.
>
>
>
> OOM exceptions should actually only occur due to user code objects. Given
> that you have reserved a massive amount of memory for the network buffers
> the remaining heap for the user code is probably very small. Try whether you
> can decrease the number of network buffers. Moreover, check whether your
> user code keeps somewhere references to objects which could cause the OOM.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Thu, Apr 20, 2017 at 5:42 PM, Stefano Bortoli
> <[hidden email]> wrote:
>
> I think that if you have a lot of memory available, the GC gets kind of
> lazy. In our case, the issue was just the latency caused by the GC, cause we
> were loading more data than it could fit in memory. Hence optimizing the
> code gave us a lot of improvements. FlatMaps are also dangerous as objects
> can multiply beyond expected, making co-group extremely costly. :-) A
> distinct() well placed saves a lot of time and memory.
>
>
>
> My point is that having worked with scarce resources I learned that almost
> all the time the issue was my code, not the framework.
>
>
>
> Good luck.
>
>
>
> Stefano
>
>
>
> From: Newport, Billy [mailto:[hidden email]]
> Sent: Thursday, April 20, 2017 4:46 PM
> To: Stefano Bortoli <[hidden email]>; 'Fabian Hueske'
> <[hidden email]>
>
>
> Cc: '[hidden email]' <[hidden email]>
> Subject: RE: Flink memory usage
>
>
>
> Your reuse idea kind of implies that it’s a GC generation rate issue, i.e.
> it’s not collecting fast enough so it’s running out of memory versus heap
> that’s actually anchored, right?
>
>
>
>
>
> From: Stefano Bortoli [mailto:[hidden email]]
> Sent: Thursday, April 20, 2017 10:33 AM
> To: Newport, Billy [Tech]; 'Fabian Hueske'
> Cc: '[hidden email]'
> Subject: RE: Flink memory usage
>
>
>
> Hi Billy,
>
>
>
> The only suggestion I can give is to check very well in your code for
> useless variable allocations, and foster reuse as much as possible. Don’t
> create a new collection at any map execution, but rather clear, reuse the
> collected output of the flatMap, and so on.  In the past we run long process
> of lot of data and small memory without problems. Many more complex
> co-group, joins and so on without any issue.
>
>
>
> My2c. Hope it helps.
>
>
>
> Stefano
>
>
>
> From: Newport, Billy [mailto:[hidden email]]
> Sent: Thursday, April 20, 2017 1:31 PM
> To: 'Fabian Hueske' <[hidden email]>
> Cc: '[hidden email]' <[hidden email]>
> Subject: RE: Flink memory usage
>
>
>
> I don’t think our function are memory heavy they typically are cogroups and
> merge the records on the left with the records on the right.
>
>
>
> We’re currently requiring 720GB of heap to do our processing which frankly
> appears ridiculous to us. Could too much parallelism be causing the problem?
> Looking at:
>
>
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Optimal-Configuration-for-Cluster-td5024.html
>
>
>
> If we are processing 17 “datasets” in a single job and each has an
> individual parallelism of 40 is that a total parallelism (potential) of
> 17*40 and given your network buffers calculation of parallelism squared,
> would that do it or only if we explicitly configure it that way:
>
>
>
> taskmanager.network.numberOfBuffers: p ^ 2 * t * 4
>
>
> where p is the maximum parallelism of the job and t is the number of task
> manager.
>
> You can process more than one parallel task per TM if you configure more
> than one processing slot per machine ( taskmanager.numberOfTaskSlots). The
> TM will divide its memory among all its slots. So it would be possible to
> start one TM for each machine with 100GB+ memory and 48 slots each.
>
>
>
> Our pipeline for each dataset looks like this:
>
>
>
> Read avro file -> FlatMap -> Validate each record with a flatmap ->
>
> Read Parquet -> FlatMap -> Filter Live Rows -> CoGroup with the validated
> avro file above -> }
>
> Read Parquet -> FlatMap -> Filter Dead Rows
> ----------------------------------à  } Union cogroup with dead rows and
> write result to parquet file.
>
>
>
> I don’t understand why this logic couldn’t run with a single task manager
> and just take longer. We’re having a lot of trouble trying to change the
> tuning to reduce the memory burn. We run the above pipeline with parallelism
> 40 for all 17 datasets in a single job.
>
>
>
> We’re running this config now which is not really justifiable for what we’re
> doing.
>
>
>
> 20 nodes 2 slots, 40 parallelism 36GB mem = 720GB of heap…
>
>
>
> Thanks
>
>
>
> From: Fabian Hueske [mailto:[hidden email]]
> Sent: Wednesday, April 19, 2017 10:52 AM
> To: Newport, Billy [Tech]
> Cc: [hidden email]
> Subject: Re: Flink memory usage
>
>
>
> Hi Billy,
>
> Flink's internal operators are implemented to not allocate heap space
> proportional to the size of the input data.
>
> Whenever Flink needs to hold data in memory (e.g., for sorting or building a
> hash table) the data is serialized into managed memory. If all memory is in
> use, Flink starts spilling to disk. This blog post discusses how Flink uses
> its managed memory [1] (still up to date, even though it's almost 2 years
> old).
>
> The runtime code should actually quite stable. Most of the code has been
> there for several years (even before Flink was donated to the ASF) and we
> haven't seen many bugs reported for the DataSet runtime. Of course this does
> not mean that the code doesn't contain bugs.
>
>
>
> However, Flink does not take care of the user code. For example a
> GroupReduceFunction that collects a lot of data, e.g., in a List on the
> heap, can still kill a program.
>
> I would check if you have user functions that require lots of heap memory.
>
> Also reducing the size of the managed memory to have more heap space
> available might help.
>
> If that doesn't solve the problem, it would be good if you could share some
> details about your job (which operators, which local strategies, how many
> operators) that might help to identify the misbehaving operator.
>
>
>
> Thanks, Fabian
>
>
> [1]
> https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html
>
>
>
> 2017-04-19 16:09 GMT+02:00 Newport, Billy <[hidden email]>:
>
> How does Flink use memory? We’re seeing cases when running a job on larger
> datasets where it throws OOM exceptions during the job. We’re using the
> Dataset API. Shouldn’t flink be streaming from disk to disk? We workaround
> by using fewer slots but it seems unintuitive that I need to change these
> settings given Flink != Spark. Why isn’t Flinks memory usage constant? Why
> couldn’t I run a job with a single task and a single slot for any size job
> successfully other than it takes much longer to run.
>
>
>
> Thanks
>
> Billy
>
>
>
>
>
>
>
>