We recently migrated to Flink 1.10, but are experiencing some issues with memory.
Our cluster is: 1) Running inside of Kubernetes 2) Running in HA mode 3) Checkpointing/Savepointing to an HDFS cluster inside of Kubernetes 4) Using RocksDB for checkpointing 5) Running on m5d.4xlarge EC2 instances with 64gb of ram 6) The taskmanager pods do not have a memory limit set on them 7) We set taskmanager.memory.process.size to 48g We get the following error: 2020-05-27 10:12:34 java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_376501a366f04bbaab99945c23a40da5_(28/32) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 more Caused by: java.lang.OutOfMemoryError: Direct buffer memory at java.nio.Bits.reserveMemory(Bits.java:694) at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) at org.apache.hadoop.util.DirectBufferPool.getBuffer(DirectBufferPool.java:72) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.reallocPacketBuf(PacketReceiver.java:272) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:165) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102) at org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.java:201) at org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:152) at org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.java:767) at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:823) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:883) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:926) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:727) at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:84) at org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:51) at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:133) at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:187) at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:180) at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:167) at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:151) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279) ... 15 more Do I need to be more explicit with the off-heap memory for RocksDB? -Steve |
Hi Steve, RocksDB does not contribute to the JVM direct memory. RocksDB off-heap memory consumption is part of managed memory [1]. You got `OutOfMemoryError: Direct buffer memory` which is related to the JVM direct memory, also off-heap but managed by JVM. The JVM direct memory limit depends on the corresponding JVM argument of Flink process: -XX:MaxDirectMemorySize [2]. You can increase the direct memory limit by changing this configuration option: taskmanager.memory.task.off-heap.size [3]. Judging the stack trace you provided, the HDFS dependency of RocksDB state backend requires more JVM direct memory for remote communication to restore state. This can also mean that there are other consumers of direct memory in your application but it was the HDFS which hit the limit. See also the troubleshooting guide for `OutOfMemoryError: Direct buffer memory` [4]. Notice that Flink network buffers also use the JVM direct memory but Flink makes sure that they do not exceed their limit [5]. Best, Andrey On Wed, May 27, 2020 at 6:30 PM Steven Nelson <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |