hi,I met an appearance like this:this is my sql:SELECT distinct header,label,reqsig,dense_feat,item_id_feat,user_id_feat FROM app.app_ranking_feature_table_clk_ord_hp_new_all_tree_orc where dt='2021-04-01'When I useBlinkPlanner inBatchMode, It works well; But if I set inStreamMode,It cause a heap OOM.Caused by: java.lang.OutOfMemoryError: Java heap spaceat java.util.Arrays.copyOf(Arrays.java:3236)at org.apache.flink.core.memory.DataOutputSerializer.getCopyOfBuffer(DataOutputSerializer.java:85)at org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.buildCompositeKeyNamespace(RocksDBSerializedCompositeKeyBuilder.java:113)at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:163)at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:83)at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:129)at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:399)at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$221/285424866.runDefaultAction(Unknown Source)at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:620)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:584)at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:844)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:636)at java.lang.Thread.run(Thread.java:748)I use the rocksdb, and I confirm it works,then I jmap the tm:num #instances #bytes class name----------------------------------------------1: 214656 4420569368 [C2: 111199 2376771576 [B3: 137904 7722624 org.apache.flink.core.memory.HybridMemorySegment4: 214539 5148936 java.lang.String5: 31796 2635104 [Ljava.lang.Object;6: 105133 2523192 [Lorg.apache.flink.core.memory.MemorySegment;7: 105115 2522760 org.apache.flink.table.data.binary.BinarySection8: 105115 2522760 org.apache.flink.table.data.binary.BinaryStringData9: 32812 2099968 java.nio.DirectByteBuffer10: 14838 1651560 java.lang.Class11: 50002 1600064 java.util.concurrent.ConcurrentHashMap$Node12: 43014 1376448 java.util.Hashtable$Entry13: 32805 1312200 sun.misc.CleanerIt looks like the data is in heap rather than in rocksdb, Is there any way to set the data to the rocksdb?
Free forum by Nabble | Edit this page |