Read Hive table in Stream Mode use distinct cause heap OOM

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Read Hive table in Stream Mode use distinct cause heap OOM

张颖

hi,I met an appearance like this:

this is my sql:
SELECT distinct header,label,reqsig,dense_feat,item_id_feat,user_id_feat FROM app.app_ranking_feature_table_clk_ord_hp_new_all_tree_orc where dt='2021-04-01'


When I useBlinkPlanner inBatchMode, It works well; But if I set inStreamMode,
It cause a heap OOM.

Caused by: java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:3236)
    at org.apache.flink.core.memory.DataOutputSerializer.getCopyOfBuffer(DataOutputSerializer.java:85)
    at org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.buildCompositeKeyNamespace(RocksDBSerializedCompositeKeyBuilder.java:113)
    at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:163)
    at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:83)
    at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:129)
    at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
    at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:399)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$221/285424866.runDefaultAction(Unknown Source)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:620)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:584)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:844)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:636)
    at java.lang.Thread.run(Thread.java:748)


I use the rocksdb, and I confirm it works,then I jmap the tm:
num     #instances         #bytes  class name
----------------------------------------------
   1:        214656     4420569368  [C
   2:        111199     2376771576  [B
   3:        137904        7722624  org.apache.flink.core.memory.HybridMemorySegment
   4:        214539        5148936  java.lang.String
   5:         31796        2635104  [Ljava.lang.Object;
   6:        105133        2523192  [Lorg.apache.flink.core.memory.MemorySegment;
   7:        105115        2522760  org.apache.flink.table.data.binary.BinarySection
   8:        105115        2522760  org.apache.flink.table.data.binary.BinaryStringData
   9:         32812        2099968  java.nio.DirectByteBuffer
  10:         14838        1651560  java.lang.Class
  11:         50002        1600064  java.util.concurrent.ConcurrentHashMap$Node
  12:         43014        1376448  java.util.Hashtable$Entry
  13:         32805        1312200  sun.misc.Cleaner


It looks like the data is in heap rather than in rocksdb, Is there any way to set the data to the rocksdb?



 

Reply | Threaded
Open this post in threaded view
|

Re: Read Hive table in Stream Mode use distinct cause heap OOM

Shengkai Fang
Hi, could you tell me which version do you use? I just want to check whether there are any problems.

Best,
Shengkai

张颖 <[hidden email]> 于2021年4月25日周日 下午5:23写道:

hi,I met an appearance like this:

this is my sql:
SELECT distinct header,label,reqsig,dense_feat,item_id_feat,user_id_feat FROM app.app_ranking_feature_table_clk_ord_hp_new_all_tree_orc where dt='2021-04-01'


When I useBlinkPlanner inBatchMode, It works well; But if I set inStreamMode,
It cause a heap OOM.

Caused by: java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:3236)
    at org.apache.flink.core.memory.DataOutputSerializer.getCopyOfBuffer(DataOutputSerializer.java:85)
    at org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.buildCompositeKeyNamespace(RocksDBSerializedCompositeKeyBuilder.java:113)
    at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:163)
    at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:83)
    at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:129)
    at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
    at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:399)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$221/285424866.runDefaultAction(Unknown Source)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:620)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:584)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:844)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:636)
    at java.lang.Thread.run(Thread.java:748)


I use the rocksdb, and I confirm it works,then I jmap the tm:
num     #instances         #bytes  class name
----------------------------------------------
   1:        214656     4420569368  [C
   2:        111199     2376771576  [B
   3:        137904        7722624  org.apache.flink.core.memory.HybridMemorySegment
   4:        214539        5148936  java.lang.String
   5:         31796        2635104  [Ljava.lang.Object;
   6:        105133        2523192  [Lorg.apache.flink.core.memory.MemorySegment;
   7:        105115        2522760  org.apache.flink.table.data.binary.BinarySection
   8:        105115        2522760  org.apache.flink.table.data.binary.BinaryStringData
   9:         32812        2099968  java.nio.DirectByteBuffer
  10:         14838        1651560  java.lang.Class
  11:         50002        1600064  java.util.concurrent.ConcurrentHashMap$Node
  12:         43014        1376448  java.util.Hashtable$Entry
  13:         32805        1312200  sun.misc.Cleaner


It looks like the data is in heap rather than in rocksdb, Is there any way to set the data to the rocksdb?