Hi everyone,
my experience with RocksDBStatebackend have left me a little bit confused. Maybe you guys can confirm that my epxierence is the expected behaviour ;): I have run a "performancetest" twice, once with FsStateBackend and once RocksDBStatebackend in comparison. In this particular test the state saved is generally not large (in a production scenario it will be larger.) These are my observations: 1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared to <<1MB with the FSStatebackend. 2. Throughput dropped from 28k/s -> 18k/s on a small cluster. 3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for FsStatebackend but >100MB for RocksDbStatebackend. I hope the difference gets smaller for very large state. Can you confirm? 4. Checkpointing Times as reported in the Dashboard were 26secs for RocksDB during the test and <1 second for FsStatebackend. Does the reported time correspond to the sync. + asynchronous part of the checkpointing in case of RocksDB? Is there any way to tell how long the synchronous part takes? Form these first observations RocksDB does seem to bring a large overhead for state < 1GB, I guess? Is this expected? Cheers, Konstantin |
Hi, I'm going to try and respond to each point: 1. This seems strange, could you give some background on parallelism, number of operators with state and so on? Also, I'm assuming you are using the partitioned state abstraction, i.e. getState(), correct? 2. your observations are pretty much correct. The reason why RocksDB is slower is that the FsStateBackend basically stores the state in a Java HashMap and writes the contents to HDFS when checkpointing. RocksDB stores data in on-disk files and goes to them for every state access (of course there are caches, but generally it is like this). I'm actually impressed that it is still this fast in comparison. 3. see 1. (I think for now) 4. The checkpointing time is the time from the JobManager deciding to start a checkpoint until all tasks have confirmed that checkpoint. I have seen this before and I think it results from back pressure. The problem is that the checkpoint messages that we sent through the topology are sitting at the sources because they are also back pressured by the slow processing of normal records. You should be able to see the actual checkpointing times (both synchronous and asynchronous) in the log files of the task managers, they should be very much lower. I can go into details, I'm just writing this quickly before calling it a day. :-) Cheers, Aljoscha On Tue, 12 Apr 2016 at 18:21 Konstantin Knauf <[hidden email]> wrote: Hi everyone, |
Concerning the size of RocksDB snapshots - I am wondering if RocksDB simply does not compact for a long time, thus having a lot of stale data in the snapshot. That would be especially the case, if you have a lot of changing values for the same set of keys. On Tue, Apr 12, 2016 at 6:41 PM, Aljoscha Krettek <[hidden email]> wrote:
|
Is it possible to add an option to store the state in the Java HashMap and write its content to RocksDB when checkpointing? For "hot" keys that are updated very frequently such optimization would help with performance. I know that you are also working on incremental checkpoints which would also be big win for jobs with a large number of keys. Thanks, Maxim. On Tue, Apr 12, 2016 at 10:39 AM, Stephan Ewen <[hidden email]> wrote:
|
Hi Maxim, yes the plan is to have a cache of hot values that uses the managed memory abstraction of Flink so that we can make sure that we stay within memory bounds and don't run into OOM exceptions. On Tue, 12 Apr 2016 at 23:37 Maxim <[hidden email]> wrote:
|
This is something that my team and I have discussed building, so it's great to know that it's already on the radar. If we beat you to it, I'll definitely try to make it a contribution.
Shannon
From: Aljoscha Krettek <[hidden email]>
Date: Wednesday, April 13, 2016 at 1:46 PM To: <[hidden email]> Subject: Re: RocksDB Statebackend Hi Maxim,
yes the plan is to have a cache of hot values that uses the managed memory abstraction of Flink so that we can make sure that we stay within memory bounds and don't run into OOM exceptions.
On Tue, 12 Apr 2016 at 23:37 Maxim <[hidden email]> wrote:
|
That's interesting to hear. If you want we can also collaborate on that one. Using the Flink managed memory for that purpose would require some changes to lower layers of Flink. On Wed, 13 Apr 2016 at 13:11 Shannon Carey <[hidden email]> wrote:
|
In reply to this post by Aljoscha Krettek
Hi Aljoscha,
thanks for your answers. I am currently not in the office, so I can not run any further analysis until Monday. Just some quick answers to your questions. We are using the partitioned state abstraction, most of the state should correspond to buffered events in windows. Parallelism is 9. In terms of stateful operators we basically just have a KafkaSource, a custom stateful trigger as well as a RollingSink. Overall in this test scenario the state is very limited (see size of state using FsStateBackend). I will get back to you once, I have done some more experiments, which will be in the course of next week. Cheers, Konstantin On 12.04.2016 18:41, Aljoscha Krettek wrote: > Hi, > I'm going to try and respond to each point: > > 1. This seems strange, could you give some background on parallelism, > number of operators with state and so on? Also, I'm assuming you are > using the partitioned state abstraction, i.e. getState(), correct? > > 2. your observations are pretty much correct. The reason why RocksDB is > slower is that the FsStateBackend basically stores the state in a Java > HashMap and writes the contents to HDFS when checkpointing. RocksDB > stores data in on-disk files and goes to them for every state access (of > course there are caches, but generally it is like this). I'm actually > impressed that it is still this fast in comparison. > > 3. see 1. (I think for now) > > 4. The checkpointing time is the time from the JobManager deciding to > start a checkpoint until all tasks have confirmed that checkpoint. I > have seen this before and I think it results from back pressure. The > problem is that the checkpoint messages that we sent through the > topology are sitting at the sources because they are also back pressured > by the slow processing of normal records. You should be able to see the > actual checkpointing times (both synchronous and asynchronous) in the > log files of the task managers, they should be very much lower. > > I can go into details, I'm just writing this quickly before calling it a > day. :-) > > Cheers, > Aljoscha > > On Tue, 12 Apr 2016 at 18:21 Konstantin Knauf > <[hidden email] <mailto:[hidden email]>> wrote: > > Hi everyone, > > my experience with RocksDBStatebackend have left me a little bit > confused. Maybe you guys can confirm that my epxierence is the expected > behaviour ;): > > I have run a "performancetest" twice, once with FsStateBackend and once > RocksDBStatebackend in comparison. In this particular test the state > saved is generally not large (in a production scenario it will be > larger.) > > These are my observations: > > 1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared > to <<1MB with the FSStatebackend. > > 2. Throughput dropped from 28k/s -> 18k/s on a small cluster. > > 3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for > FsStatebackend but >100MB for RocksDbStatebackend. I hope the difference > gets smaller for very large state. Can you confirm? > > 4. Checkpointing Times as reported in the Dashboard were 26secs for > RocksDB during the test and <1 second for FsStatebackend. Does the > reported time correspond to the sync. + asynchronous part of the > checkpointing in case of RocksDB? Is there any way to tell how long the > synchronous part takes? > > Form these first observations RocksDB does seem to bring a large > overhead for state < 1GB, I guess? Is this expected? > > Cheers, > > Konstantin > -- Konstantin Knauf * [hidden email] * +49-174-3413182 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082 |
Free forum by Nabble | Edit this page |