Hello everyone,
I have a question about the spilling behavior of a Flink batch job. The relevant part is a standard map-reduce, aggregating 4 billion Tuple3<Integer, Integer, Integer> together via a groupBy(0,1).sum(2). And there really doesn't happen much else in the job. The problem is that I don't understand why this job spills to disk. In this example the spilling is not really an issue, but we run the same job with much larger datasets, where we simply run out of disk space. So we're trying to understand better what it spills and what we can do about it. In this example, I am running on AWS EMR (Flink 1.3.1) with a machine with 240GB memory. I tweaked the following parameters: yarn.heap-cutoff-ratio: 0.1 taskmanager.memory.fraction: 0.9 taskmanager.network.numberOfBuffers: 32768 This leads to 170GB Flink Managed Memory which in my opinion should suffice for the amount of data (the amount of data going from the combine to the reduce is roughly 80GB). However, it is spilling over 70GB on disk. Do you have a hint for me why this could be the case and can explain what exactly is written into the state on such a group-reduce? Thank you so much for your input, best regards Konstantin -- Konstantin Gregor * [hidden email] TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller Amtsgericht München, HRB 135082 |
Hi Konstantin, Flink's sort algorithm works as follows:- Each memory-consuming task (such as sort or combine) has a memory budget which depends on the number of operators in the plan, the TM managed memory, and the number of slots of the TM. Each TM slot has the same fraction of the over all TM memory. If there are two memory consuming operators (combine and sort), each of their tasks gets 50% of the slot memory. So if you have a TM with 40GB and 4 slots, each slot has 10GB and each task 5 GB. - The sorted stream is produced by merging the sorted and spilled records. There are a few reasons that might cause spilling. 1) the spilling threshold is too tight. For example to sort 10GB in memory (in a single task), you need more than 14.20GB of sorter memory (10GB / 0.7). The idea here is start early enough to spill such that the first buffer is empty before the third buffer is filled we and we have to block the input. I'm not sure if it is easily possible to tweak the threshold. 2) the data might be skewed. Something that you could try is to use a hash-combiner which can help to improve the combine rate if you have have a rather low number of distinct keys. Hash combiners have to be explicitly chosen and are only available for ReduceFunctions. So you would have to implement the sum as a ReduceFunction and hint the hash combiner like this input.groupBy(0, 1).reduce(new SumFunc()).setCombineHint(CombineHint.HASH) Hope this helps, Fabian 2018-01-22 16:13 GMT+01:00 Konstantin Gregor <[hidden email]>: Hello everyone, |
Free forum by Nabble | Edit this page |