Trying to understand why a job is spilling despite of huge memory provided

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Trying to understand why a job is spilling despite of huge memory provided

Konstantin Gregor
Hello everyone,

I have a question about the spilling behavior of a Flink batch job.

The relevant part is a standard map-reduce, aggregating 4 billion
Tuple3<Integer, Integer, Integer> together via a groupBy(0,1).sum(2).
And there really doesn't happen much else in the job.

The problem is that I don't understand why this job spills to disk. In
this example the spilling is not really an issue, but we run the same
job with much larger datasets, where we simply run out of disk space. So
we're trying to understand better what it spills and what we can do
about it.

In this example, I am running on AWS EMR (Flink 1.3.1) with a machine
with 240GB memory. I tweaked the following parameters:

yarn.heap-cutoff-ratio: 0.1
taskmanager.memory.fraction: 0.9
taskmanager.network.numberOfBuffers: 32768

This leads to 170GB Flink Managed Memory which in my opinion should
suffice for the amount of data (the amount of data going from the
combine to the reduce is roughly 80GB). However, it is spilling over
70GB on disk.

Do you have a hint for me why this could be the case and can explain
what exactly is written into the state on such a group-reduce?

Thank you so much for your input,
best regards

Konstantin


--
Konstantin Gregor * [hidden email]
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
Amtsgericht München, HRB 135082
Reply | Threaded
Open this post in threaded view
|

Re: Trying to understand why a job is spilling despite of huge memory provided

Fabian Hueske-2
Hi Konstantin,

Flink's sort algorithm works as follows:

- Each memory-consuming task (such as sort or combine) has a memory budget which depends on the number of operators in the plan, the TM managed memory, and the number of slots of the TM. Each TM slot has the same fraction of the over all TM memory. If there are two memory consuming operators (combine and sort), each of their tasks gets 50% of the slot memory. So if you have a TM with 40GB and 4 slots, each slot has 10GB and each task 5 GB.
- The sort splits its memory budget in three buffers.
- The first buffer is filled with incoming data. Once full, it is sorted and the second buffer is filled. When the second buffer is full, the third buffer is filled and the second buffer is sorted when sorting the first buffer finished.
- When the first buffer is sorted, Flink waits until a certain amount of data is received (by default 70% of the sort's memory budget). When that happens, it starts spilling the first buffer to disk. When the buffer is spilled, the first buffer can be filled again.
- When all data was read, the last buffer is only sorted but not spilled.
- The sorted stream is produced by merging the sorted and spilled records.

There are a few reasons that might cause spilling.
1) the spilling threshold is too tight. For example to sort 10GB in memory (in a single task), you need more than 14.20GB of sorter memory (10GB / 0.7). The idea here is start early enough to spill such that the first buffer is empty before the third buffer is filled we and we have to block the input.
I'm not sure if it is easily possible to tweak the threshold.
2) the data might be skewed.

Something that you could try is to use a hash-combiner which can help to improve the combine rate if you have have a rather low number of distinct keys.
Hash combiners have to be explicitly chosen and are only available for ReduceFunctions.
So you would have to implement the sum as a ReduceFunction and hint the hash combiner like this

input.groupBy(0, 1).reduce(new SumFunc()).setCombineHint(CombineHint.HASH)

Hope this helps,
Fabian

2018-01-22 16:13 GMT+01:00 Konstantin Gregor <[hidden email]>:
Hello everyone,

I have a question about the spilling behavior of a Flink batch job.

The relevant part is a standard map-reduce, aggregating 4 billion
Tuple3<Integer, Integer, Integer> together via a groupBy(0,1).sum(2).
And there really doesn't happen much else in the job.

The problem is that I don't understand why this job spills to disk. In
this example the spilling is not really an issue, but we run the same
job with much larger datasets, where we simply run out of disk space. So
we're trying to understand better what it spills and what we can do
about it.

In this example, I am running on AWS EMR (Flink 1.3.1) with a machine
with 240GB memory. I tweaked the following parameters:

yarn.heap-cutoff-ratio: 0.1
taskmanager.memory.fraction: 0.9
taskmanager.network.numberOfBuffers: 32768

This leads to 170GB Flink Managed Memory which in my opinion should
suffice for the amount of data (the amount of data going from the
combine to the reduce is roughly 80GB). However, it is spilling over
70GB on disk.

Do you have a hint for me why this could be the case and can explain
what exactly is written into the state on such a group-reduce?

Thank you so much for your input,
best regards

Konstantin


--
Konstantin Gregor * [hidden email]
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
Amtsgericht München, HRB 135082