Hi,
We have several Flink jobs, all of which reads data from Kafka do some aggregations (over sliding windows of (1d, 1h)) and writes data to Cassandra. Something like : ``` DataStream<String> lines = env.addSource(new FlinkKafkaConsumer010( … )); DataStream<Event> events = lines.map(line -> parse(line)); DataStream<Statistics> stats = stream .keyBy(“id”) .timeWindow(1d, 1h) .sum(new MyAggregateFunction()); writeToCassandra(stats); ``` We recently made a switch to RocksDbStateBackend, for it’s suitability for large states/long windows. However, after making the switch a memory issues has come up, the memory utilisation on TaskManager gradually increases from 50 GB to ~63GB until the container is killed. We are unable to figure out what is causing this behaviour, is there some memory leak on the RocksDB ? How much memory should we allocate to the Flink TaskManager? Since, RocksDB is a native application and it does not use the JVM how much of the memory should we allocate/leave for RocksDB (out of 64GB of total memory). Is there a way to set the maximum amount of memory that will be used by RocksDB so that it doesn’t overwhelms the system? Are there some recommended optimal settings for RocksDB for larger states (for 1 day window average state size is 3GB). Any help would be greatly appreciated. I am using Flink v1.2.1. Thanks in advance. Best, Shashwat |
Hi Shashwat,
Are you specifying the RocksDBStateBackend from the flink-conf.yaml or from code? If you are specifying it from the code, you can try using PredefinedOptions.FLASH_SSD_ Also, you can try setting incremental checkpointing ( this feature is in Flink 1.3.0) If the above does not solve your issue, you can control the memory usage of RocksDB by tuning the following values and check your performance : DBOptions: (along with the FLASH_SSD_OPTIONS add the following) maxBackgroundCompactions(4) ColumnFamilyOptions: max_buffer_size : 512 MB block_cache_size : 128 MB max_write_buffer_number : 5 minimum_buffer_number_to_merge : 2 cacheIndexAndFilterBlocks : true optimizeFilterForHits: true I would recommend reading the following documents: Hope it helps. Regards, Vinay Patil On Tue, Jul 25, 2017 at 6:51 PM, Shashwat Rastogi [via Apache Flink User Mailing List archive.] <[hidden email]> wrote: Hi, |
In reply to this post by Shashwat Rastogi
Hi,
What're your task manager memory configuration ? Can you post the TaskManager's log ? Regards, Kien On 7/25/2017 8:41 PM, Shashwat Rastogi wrote: > Hi, > > We have several Flink jobs, all of which reads data from Kafka do some aggregations (over sliding windows of (1d, 1h)) and writes data to Cassandra. Something like : > > ``` > DataStream<String> lines = env.addSource(new FlinkKafkaConsumer010( … )); > DataStream<Event> events = lines.map(line -> parse(line)); > DataStream<Statistics> stats = stream > .keyBy(“id”) > .timeWindow(1d, 1h) > .sum(new MyAggregateFunction()); > writeToCassandra(stats); > ``` > > We recently made a switch to RocksDbStateBackend, for it’s suitability for large states/long windows. However, after making the switch a memory issues has come up, the memory utilisation on TaskManager gradually increases from 50 GB to ~63GB until the container is killed. We are unable to figure out what is causing this behaviour, is there some memory leak on the RocksDB ? > > How much memory should we allocate to the Flink TaskManager? Since, RocksDB is a native application and it does not use the JVM how much of the memory should we allocate/leave for RocksDB (out of 64GB of total memory). > Is there a way to set the maximum amount of memory that will be used by RocksDB so that it doesn’t overwhelms the system? Are there some recommended optimal settings for RocksDB for larger states (for 1 day window average state size is 3GB). > > Any help would be greatly appreciated. I am using Flink v1.2.1. > Thanks in advance. > > Best, > Shashwat |
Hi Vinay,
@Vinay : I am setting RocksDBStateBackend from the code, not from flink-conf.yaml. I am currently trying out the configurations that you have shared. I’ll let you know how they perform. Thank you so much for your help. However, were you able to figure out what exactly is going wrong with the RocksDb setup? And, while making these modifications did you notice any significant drop in RocksDb performance. Overall, how do you think we should configure memory for the TaskManager? Is there a way to estimate how much memory is sufficient for task manager heap and RocksDBStateBackend? Thank you Best, Shashwat |
In reply to this post by Kien Truong
Hi Kien,
Sorry it took me sometime to fetch the logs. I am attaching logs of the machine which died due to lack of free memory. I have set only `taskmanager.heap.mb: 35840` taskmanager.numberOfTaskSlots: 8 And the rest are just default properties in the flink-conf.yaml Thank you in advance. Regards Shashwat
c01-log-1300to1430.txt (79K) Download Attachment |
Hello, This is the system log, not the task manager's application log, which is what I was referring to. If you're using the standalone cluster, then the task manager log
should be in the logs directory, inside your Flink's installation Kien On 7/27/2017 3:49 PM, Shashwat Rastogi
wrote:
Hi Kien, |
In reply to this post by Shashwat Rastogi
Hi,
I see that matching the RocksDB configuration to fit certain container sizes can be very tedious and error prone for users. I have opened a jira to start improving the situation: https://issues.apache.org/jira/browse/FLINK-7289. Please feel free to comment and share your experiences or ideas, they might be very valuable input. One consideration, from what you shared I can see that you are using 8 slots per task and a heap size of 35840MB. This means that there are potentially also up to 8 RocksDB instances on one TM. Furthermore, when you are using RocksDB, your heavy state will typically live in RocksDB (native memory) and no longer on the JVM heap. I think it would make a lot of sense to reduce you maximum heap size dramatically, so that more memory from your container budget is available as native memory for RocksDB. I hope this can also help with your problem. Best, Stefan
|
Free forum by Nabble | Edit this page |