Hi all!
I cannot seem to find any setting to limit the number of records appended in a RocksDBListState that is used when we use SessionWindows with a ProcessFunction. It seems that, for each incoming element, the new element will be appended to the value with the RocksDB `merge` operator, without any safeguard to make sure that it doesn't grow infinitely. RocksDB merge seems to support returning false in case of error, so I guess we could implement a limit by returning false in the merge operator, but since Flink seems to use the "stringappendtest" merge operator ( https://github.com/facebook/rocksdb/blob/fdf882ded218344c136c97daf76dfb59e4bc155f/utilities/merge_operators/string_append/stringappend2.cc ), we always return true no matter what. This is troublesome for us because it would make a lot of sense to specify an acceptable limit to how many elements can be aggregated under a given key, and because when we happen to have too many elements we get an exception from RocksDB: ``` Caused by: org.apache.flink.util.FlinkRuntimeException: Error while retrieving data from RocksDB at org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121) at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:111) at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:60) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:501) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281) ... 7 more Caused by: org.rocksdb.RocksDBException: Requested array size exceeds VM limit at org.rocksdb.RocksDB.get(Native Method) at org.rocksdb.RocksDB.get(RocksDB.java:810) at org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:118) ... 12 more ``` We are currently bypassing this by using a Reduce operator instead, which ensures that we only store one element per key, but this gives us degraded performance. Thanks for your input! Robin |
Hi Robin
First of all, the root cause is not RocksDB cannot store large list state when you merge but the JNI limitation of 2^31 bytes [1].
Moreover, RocksDB java would not return anything when you call merge [2] operator.
Did you merge too many elements or just merge too big-size elements? Last but not least, even you could merge large list, I think getting a value with size larger than 2^31 bytes should not behave well.
Best
Yun Tang
From: Robin Cassan <[hidden email]>
Sent: Friday, May 15, 2020 0:37 To: user <[hidden email]> Subject: Protection against huge values in RocksDB List State Hi all!
I cannot seem to find any setting to limit the number of records appended in a RocksDBListState that is used when we use SessionWindows with a ProcessFunction. It seems that, for each incoming element, the new element will be appended to the value with the RocksDB `merge` operator, without any safeguard to make sure that it doesn't grow infinitely. RocksDB merge seems to support returning false in case of error,
so I guess we could implement a limit by returning false in the merge operator, but since Flink seems to use the "stringappendtest" merge operator ( https://github.com/facebook/rocksdb/blob/fdf882ded218344c136c97daf76dfb59e4bc155f/utilities/merge_operators/string_append/stringappend2.cc ),
we always return true no matter what.
This is troublesome for us because it would make a lot of sense to specify an acceptable limit to how many elements can be aggregated under a given key, and because when we happen to have too many elements we get an exception from RocksDB:
``` Caused by: org.apache.flink.util.FlinkRuntimeException: Error while retrieving data from RocksDB
at org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121) at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:111) at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:60) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:501) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281) ... 7 more Caused by: org.rocksdb.RocksDBException: Requested array size exceeds VM limit at org.rocksdb.RocksDB.get(Native Method) at org.rocksdb.RocksDB.get(RocksDB.java:810) at org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:118) ... 12 more ```
We are currently bypassing this by using a Reduce operator instead, which ensures that we only store one element per key, but this gives us degraded performance.
Thanks for your input! Robin |
Hi Yun, thanks for your answer! And sorry I didn't see this limitation from the documentation, makes sense! In our case, we are merging too many elements (since each element is limited to 4Mib in our kafka topic). I agree we do not want our state to contain really big values, this is why we are trying to find a way to put a limit on the number (or total size) of elements that are aggregated in the state of the window. We have found a way to do this by using another sessionWindow that is set before the other one, which will store the number of messages for each key and reject new messages if we have reached a limit, but we are wondering if there is a better way to achieve that without creating another state. Thanks again, Robin Le jeu. 14 mai 2020 à 19:38, Yun Tang <[hidden email]> a écrit :
|
Hi Robin
I think you could record the size of you list under currentKey with another value state or operator state (store a Map with <key-by key, list length>, store the whole map in list when snapshotting). If you do not have many key-by keys, operator state is a good
choice as that is on-heap and lightweight.
Best
Yun Tang
From: Robin Cassan <[hidden email]>
Sent: Friday, May 15, 2020 20:59 To: Yun Tang <[hidden email]> Cc: user <[hidden email]> Subject: Re: Protection against huge values in RocksDB List State Hi Yun, thanks for your answer! And sorry I didn't see this limitation from the documentation, makes sense!
In our case, we are merging too many elements (since each element is limited to 4Mib in our kafka topic). I agree we do not want our state to contain really big values, this is why we are trying to find a way to put a limit on the number (or total size)
of elements that are aggregated in the state of the window.
We have found a way to do this by using another sessionWindow that is set before the other one, which will store the number of messages for each key and reject new messages if we have reached a limit, but we are wondering if there is a better way to achieve
that without creating another state.
Thanks again,
Robin
Le jeu. 14 mai 2020 à 19:38, Yun Tang <[hidden email]> a écrit :
|
Hi As you described, I'm not sure whether MapState can help you in such case. MapState will serializer each <mapKey, mapvalue> separately, so it would not encounter such the problem as ListState. When using MapState, you may need to handle how to set the mapKey, if the whole state will be cleared after processed, then you can use a monotonous increment integer as the mapKey, store the upper used mapKey in a value state. Best, Congxian Yun Tang <[hidden email]> 于2020年5月15日周五 下午10:31写道:
|
Hi Yun and Congxian! I have implemented a pre-filter that uses an keyed state (AggregatingState[Long]) that computes the size of all records seen for each key, which lets me filter-out records that should be too big for the RocksDB JNI bridge. This seems to make our job behave better! Thanks for your help guys, this was really helpful :) Robin Le sam. 16 mai 2020 à 09:05, Congxian Qiu <[hidden email]> a écrit :
|
Great to hear that. Best, Congxian Robin Cassan <[hidden email]> 于2020年5月20日周三 上午12:18写道:
|
Free forum by Nabble | Edit this page |