backpressure and memory

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

backpressure and memory

seeksst

Hi, everyone:


I’m a flink sql user, and the version is 1.8.2.

        Recently I confuse about memory and backpressure. I have two job on yarn, due to memory over, it’s frequently killed by yarn.

One job, I have 3 taskmanagers and 6 parallelism, each one has 8G memory.It read from kafka, one minute tumble windows to calculate pv and uv. There are many aggregation dimensions, to avoid data skew, it group by deviceId,TUMBLE(event_time, INTERVAL '1' MINUTE)。My question is that the checkpoint is just 60MB, I give 24G memory, why it was killed by yarn? I use rocksdb as backend, and data is big, but I think it should trigger backpressure rather than OOM, although it dosen’t trigger. In Pool Usage is 0.45 normally.

Another job looks different, I use 2 taskmanagers and 4 parallelism, each one has 20G memory. I define a aggregate functions to calculate complex data, group by date,hour,deviceId. it seems like first job, OOM and no backpressure. but the problem is when I read one day data, just one taskmanager was killed by yarn, I confuse about this. according to dashboard, I don't find data skew, but why just one taskmanager? 

May be it’s the same question or not, but I want to know more about memory used in flink, and backpressure can stop source or not, and how to trigger it, rocksdb affect on flink.

Thanks for reading, it would be better if there were some suggestions.Thank you.

Reply | Threaded
Open this post in threaded view
|

Re: backpressure and memory

Arvid Heise-3
When YARN kills a job because of memory, it usually means that the job has used more memory than it requested. Since Flink's memory model consists not only from the Java on-heap memory but also some rocksdb off-heap memory, it's usually harder to stay within the boundaries. The general shortcoming have been recently addressed by FLIP-49 [1], available since 1.10.0. The solution is usually to plan for more off-heap memory [2].

You also refer to OOM, which is very different from a yarn kill. A Java OOM usually only happens if your program requires more on-heap memory than Java can provide, for example if you load large models into memory. If you actually experience an OOM, you should rather increase the on-heap memory for your program (decrease managed on-heap for Flink).

In all cases, checkpointing is doing something completely different from what you expect. [3] In short, checkpointing will periodically save the state of your application such that when you experience a failure (such as OOM or yarn kill), it can resume from the stored state.

Finally, backpressure has not much to do with memory consumption (except network buffers usage, which is rather tiny on your configuration). Backpressure means that one part of your application cannot process data as fast as it arrives, so backpressure is only I/O or CPU induced, never from memory. It would only come into play here by prolonging checkpointing times (potentially leading to high checkpointing latencies, which is always bad).


On Mon, Mar 23, 2020 at 4:17 AM seeksst <[hidden email]> wrote:

Hi, everyone:


I’m a flink sql user, and the version is 1.8.2.

        Recently I confuse about memory and backpressure. I have two job on yarn, due to memory over, it’s frequently killed by yarn.

One job, I have 3 taskmanagers and 6 parallelism, each one has 8G memory.It read from kafka, one minute tumble windows to calculate pv and uv. There are many aggregation dimensions, to avoid data skew, it group by deviceId,TUMBLE(event_time, INTERVAL '1' MINUTE)。My question is that the checkpoint is just 60MB, I give 24G memory, why it was killed by yarn? I use rocksdb as backend, and data is big, but I think it should trigger backpressure rather than OOM, although it dosen’t trigger. In Pool Usage is 0.45 normally.

Another job looks different, I use 2 taskmanagers and 4 parallelism, each one has 20G memory. I define a aggregate functions to calculate complex data, group by date,hour,deviceId. it seems like first job, OOM and no backpressure. but the problem is when I read one day data, just one taskmanager was killed by yarn, I confuse about this. according to dashboard, I don't find data skew, but why just one taskmanager? 

May be it’s the same question or not, but I want to know more about memory used in flink, and backpressure can stop source or not, and how to trigger it, rocksdb affect on flink.

Thanks for reading, it would be better if there were some suggestions.Thank you.