Hi guys,
I use flink version 1.7.2 I have a stateful streaming job which uses a keyed process function. I use heap state backend. Although I set TM heap size to 16 GB, I get OOM error when the state size is around 2.5 GB(from dashboard I get the state size). I have set taskmanager.memory.fraction: 0.01 (which I believe is for native calls off heap). [1] . For an 8 GB TM heap setting , the OOM errors start showing up when the state size reaches 1 GB. This I find puzzling because I would expect to get a lot more space on the heap for state when I change the size to 16 GB, what fraction of the heap is used by the framework ?[2]. Below is the stack trace for the exception. How can I increase my state size on the heap ? 2020-08-21 02:05:54,443 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Memory usage stats: [HEAP: 11920/13653/13653 MB, NON HEAP: 130/154/-1 MB (used/committed/max)] 2020-08-21 02:05:54,444 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Direct memory stats: Count: 32796, Total Capacity: 1074692520, Used Memory: 1074692521 2020-08-21 02:05:54,444 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Off-heap pool stats: [Code Cache: 51/55/240 MB (used/committed/max)], [Metaspace: 70/88/-1 MB (used/committed/max)], [Compressed Class Space: 8/11/1024 MB (used/committed/max)] 2020-08-21 02:05:54,444 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Garbage collector stats: [PS Scavenge, GC TIME (ms): 481035, GC COUNT: 1770], [PS MarkSweep, GC TIME (ms): 8720945, GC COUNT: 265] 2020-08-21 02:05:54,446 INFO org.apache.flink.runtime.taskmanager.Task - KeyedProcess (1/4) (23946753549293edc23e88f257980cb4) switched from RUNNING to FAILED. java.lang.OutOfMemoryError: Java heap space at java.lang.reflect.Array.newInstance(Array.java:75) at java.util.Arrays.copyOf(Arrays.java:3212) at java.util.Arrays.copyOf(Arrays.java:3181) at org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.resizeQueueArray(AbstractHeapPriorityQueue.java:153) at org.apache.flink.runtime.state.heap.HeapPriorityQueue.increaseSizeByOne(HeapPriorityQueue.java:172) at org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:83) at org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper.lambda$keyGroupReader$0(HeapPriorityQueueSnapshotRestoreWrapper.java:85) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper$$Lambda$229/674995813.consume(Unknown Source) at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:298) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:492) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:453) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:410) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:358) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:104) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:284) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:748) [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-fsstatebackend [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html#overview Best, Vishwas |
Hi Vishwas, According to the log, heap space is 13+GB, which looks fine. Several reason might lead to the heap space OOM:
I would suggest taking a look at the GC logs. Thank you~ Xintong Song On Fri, Aug 21, 2020 at 10:34 PM Vishwas Siravara <[hidden email]> wrote:
|
Hi Vishwas, If you use Flink 1.7, check the older memory model docs [1] because you referred to the new memory model of Flink 1.10 in your reference 2. Could you also share a screenshot where you get the state size of 2.5 GB? Do you mean Flink WebUI? Generally, it is quite hard to estimate the on-heap size of state java objects. I never heard about such a Flink metric. Best, On Mon, Aug 24, 2020 at 4:05 AM Xintong Song <[hidden email]> wrote:
|
Hi Andrey and Xintong. 2.5 GB is from the flink web UI( checkpoint size). I took a heap dump and I could not find any memory leak from user code. I see the similar behaviour on smaller heap size, on a 1GB heap , the state size from checkpoint UI is 180 MB. Attaching some screenshots of heap profiles if it helps. So when the state grows GC takes a long time and sometimes the job manager removes TM slot because of 10000ms timeout and tries to restore the task in another task manager, this creates a cascading effect and affects other jobs running on the cluster. My tests were run in a single node cluster with 1 TM and 4 task slots with a parallelism of 4. Best, Vishwas On Tue, Aug 25, 2020 at 10:02 AM Andrey Zagrebin <[hidden email]> wrote:
Screen Shot 2020-08-26 at 9.09.55 AM.jpg (131K) Download Attachment Screen Shot 2020-08-26 at 9.12.06 AM.jpg (83K) Download Attachment Screen Shot 2020-08-26 at 9.11.30 AM.jpg (111K) Download Attachment Screen Shot 2020-08-26 at 9.10.41 AM.jpg (146K) Download Attachment |
Hi Vishwas, I believe the screenshots are from a heap size of 1GB? There are indeed many internal Flink state objects. They are overhead which is required for Flink to organise and track the state on-heap. Depending on the actual size of your state objects, the overhead may be relatively large or compared to the actual state size. For example, if you just keep integers in your state then overhead is probably a couple of times larger. It is not easy to estimate exactly on-heap size without through analysis. The checkpoint has little overhead and includes only actual state data - your serialized state objects which are probably smaller than their heap representation. So my guess is that the heap representation of the state is much bigger compared to the checkpoint size. I also cc other people who might add more thoughts about on-heap state size. You could also provide GC logs as Xintong suggested. Best, Andrey On Wed, Aug 26, 2020 at 4:21 PM Vishwas Siravara <[hidden email]> wrote:
|
Hi Andrey, Thanks for getting back to me so quickly. The screenshots are for 1GB heap, the keys for the state are 20 character strings(20 bytes, we don't have multi byte characters) . So the overhead seems to be quite large(4x) even in comparison to the checkpoint size(which already adds an overhead) . In this document [1] it says use FS/Heap backend for large states, is this quantifiable with respect to JVM heap size on a single node without the node being used for other tasks ? I have attached GC log for TM and JM Best, Vishwas On Wed, Aug 26, 2020 at 11:29 AM Andrey Zagrebin <[hidden email]> wrote:
flink-was-standalonesession-0-.gc.log.0.current (372K) Download Attachment flink-was-taskexecutor-0-.gc.log.0.current (6M) Download Attachment |
Hi Vishwas, is this quantifiable with respect to JVM heap size on a single node without the node being used for other tasks ? I don't quite understand this question. I believe the recommendation in docs has the same reason: use larger state objects so that the Java object overhead pays off. RocksDB keeps state in memory and on disk in the serialized form. Therefore it usually has a smaller footprint. Other jobs in the same task manager can potentially use other state backend depending on their state requirements. All tasks in the same task manager share the JVM heap as the task manager runs one JVM system process on the machine where it is deployed to. Best, Andrey On Wed, Aug 26, 2020 at 6:52 PM Vishwas Siravara <[hidden email]> wrote:
|
Thanks Andrey, My question is related to The FsStateBackend is encouraged for:
How large is large state without any overhead added by the framework? Best, Vishwas On Wed, Aug 26, 2020 at 12:10 PM Andrey Zagrebin <[hidden email]> wrote:
|
Hi Vishwas, Your scenario sounds like RocksDB would actually be recommended. I would always suggest to start with RocksDB, unless your state is really small compared to the available memory, or you need to optimize for performance. But maybe your job is running fine with RocksDB (performance wise), then there's no need to go into the details of heap memory management with Flink. On Wed, Aug 26, 2020 at 7:21 PM Vishwas Siravara <[hidden email]> wrote:
|
Hi The stack said that the job failed when restoring from checkpoint/savepoint. If encounter this when in failover, maybe you can try to find out the root cause which caused the job failover. For the stack, it because when restoring `HeapPriorityQueue`, there would ensure there are enough size by resizeQueueArray[1](use Arrays.copy), maybe this is the problem, could you please take heap dump when exit with OOM? Best, Congxian Robert Metzger <[hidden email]> 于2020年8月27日周四 下午10:59写道:
|
Free forum by Nabble | Edit this page |