Re: Capacity Planning For Large State in YARN Cluster
Posted by
Ashish Pokharel on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Capacity-Planning-For-Large-State-in-YARN-Cluster-tp16444p16472.html
Jorn, correct and I suppose that's where we are at this point. RocksDB based backend is definitely looking promising for our use case. Since I haven't gotten a definite no-no on using 30% for YARN cut-off ratio (about 1.8GB from 6GB memory) and off-heap flag turned on, we will continue on that path. Current plan is to increase throughput on input streams - state streams are pretty much processing already and preserved in RocksDB and we can control streams for joining with those states and monitor resource utilizations + join performance. We are seeing 200-500ms processing times with pretty decent amount of logging, which is pretty good for our needs.
Agree about the way to estimate the size of state and hence one of the reasons of my original question on what others have done. Our states are essentially tuples (few primitive values like string, long and a Map of string and string, which hold about 10-12 keys, values are small - not more than 128 bytes tops). We created a savepoint after processing about 500k records and that's where my estimate came from. I'd be the first one to admit it is not accurate but that's the best we could think of.
Thanks, Ashish
Well you can only performance test it beforehand in different scenarios with different configurations.
I am not sure what exactly your state holds (eg how many objects etc), but if it is Java objects then 3 times might be a little bit low (depends also how you initially tested state size) - however Flink optimizes this as well. Nevertheless, something like Rocksdb is probably a better solution for larger states.
Hi Till,
I got the same feedback from Robert Metzger over in Stackflow. I have switched my app to use RocksDB and as yes, it did stabilize the app :)
However, I am still struggling with how to map out my TMs and JMs memory, number of slots per TMs etc. Currently I am using 60 slots with 10 TMs and 60 GB of total cluster memory. Idea was to make the states distributed and approx. 1 GB of memory per slot. I have also changed containerized.heap-cutoff-ratio config to 0.3 to allow for a little room for RocksDB (RocksDB is using basic spinning disk optimized pre-defined configs but we do have SSDs on our Prod machines that we can leverage in future too) and set taskmanager.memory.off-heap to true.It feels more experimental at this point than an exact science :) If there are any further guidelines on how we can plan for this as we open up the flood gates to stream heavy continuous streams, that will be great.
Thanks again,
Ashish
Hi Ashish,
what you are describing should be a good use case for Flink and it should be able to run your program.
When you are seeing a GC overhead limit exceeded error, then it means that Flink or your program are creating too many/too large objects filling up the memory in a short time. I would recommend checking your user program to see whether you can avoid unnecessary object instantiations and whether it is possible to reuse created objects.
Concerning Flink's state backends, the memory state backend is currently not able to spill to disk. Also the managed memory is only relevant for DataSet/batch programs and not streaming programs. Therefore, I would recommend you to try out the RocksDB state backend which is able to gracefully spill to disk if the state size should grow too large. Consequently, you don't have to adjust the managed memory settings because they currently don't have an effect on streaming programs.
My gut feeling is that switching to the RocksDBStateBackend could already solve your problems. If this should not be the case, then please let me know again.
Cheers,
Till