RocksDB Statebackend

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

RocksDB Statebackend

snntr
Hi everyone,

my experience with RocksDBStatebackend have left me a little bit
confused. Maybe you guys can confirm that my epxierence is the expected
behaviour ;):

I have run a "performancetest" twice, once with FsStateBackend and once
RocksDBStatebackend in comparison. In this particular test the state
saved is generally not large (in a production scenario it will be larger.)

These are my observations:

1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared
to <<1MB with the FSStatebackend.

2. Throughput dropped from 28k/s -> 18k/s on a small cluster.

3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for
FsStatebackend but >100MB for RocksDbStatebackend. I hope the difference
gets smaller for very large state. Can you confirm?

4. Checkpointing Times as reported in the Dashboard were 26secs for
RocksDB during the test and <1 second for FsStatebackend. Does the
reported time correspond to the sync. + asynchronous part of the
checkpointing in case of RocksDB? Is there any way to tell how long the
synchronous part takes?

Form these first observations RocksDB does seem to bring a large
overhead for state < 1GB, I guess? Is this expected?

Cheers,

Konstantin
Reply | Threaded
Open this post in threaded view
|

Re: RocksDB Statebackend

Aljoscha Krettek
Hi,
I'm going to try and respond to each point:

1. This seems strange, could you give some background on parallelism, number of operators with state and so on? Also, I'm assuming you are using the partitioned state abstraction, i.e. getState(), correct?

2. your observations are pretty much correct. The reason why RocksDB is slower is that the FsStateBackend basically stores the state in a Java HashMap and writes the contents to HDFS when checkpointing. RocksDB stores data in on-disk files and goes to them for every state access (of course there are caches, but generally it is like this). I'm actually impressed that it is still this fast in comparison.

3. see 1. (I think for now)

4. The checkpointing time is the time from the JobManager deciding to start a checkpoint until all tasks have confirmed that checkpoint. I have seen this before and I think it results from back pressure. The problem is that the checkpoint messages that we sent through the topology are sitting at the sources because they are also back pressured by the slow processing of normal records. You should be able to see the actual checkpointing times (both synchronous and asynchronous) in the log files of the task managers, they should be very much lower.

I can go into details, I'm just writing this quickly before calling it a day. :-)

Cheers,
Aljoscha

On Tue, 12 Apr 2016 at 18:21 Konstantin Knauf <[hidden email]> wrote:
Hi everyone,

my experience with RocksDBStatebackend have left me a little bit
confused. Maybe you guys can confirm that my epxierence is the expected
behaviour ;):

I have run a "performancetest" twice, once with FsStateBackend and once
RocksDBStatebackend in comparison. In this particular test the state
saved is generally not large (in a production scenario it will be larger.)

These are my observations:

1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared
to <<1MB with the FSStatebackend.

2. Throughput dropped from 28k/s -> 18k/s on a small cluster.

3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for
FsStatebackend but >100MB for RocksDbStatebackend. I hope the difference
gets smaller for very large state. Can you confirm?

4. Checkpointing Times as reported in the Dashboard were 26secs for
RocksDB during the test and <1 second for FsStatebackend. Does the
reported time correspond to the sync. + asynchronous part of the
checkpointing in case of RocksDB? Is there any way to tell how long the
synchronous part takes?

Form these first observations RocksDB does seem to bring a large
overhead for state < 1GB, I guess? Is this expected?

Cheers,

Konstantin
Reply | Threaded
Open this post in threaded view
|

Re: RocksDB Statebackend

Stephan Ewen
Concerning the size of RocksDB snapshots - I am wondering if RocksDB simply does not compact for a long time, thus having a lot of stale data in the snapshot.

That would be especially the case, if you have a lot of changing values for the same set of keys.

On Tue, Apr 12, 2016 at 6:41 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
I'm going to try and respond to each point:

1. This seems strange, could you give some background on parallelism, number of operators with state and so on? Also, I'm assuming you are using the partitioned state abstraction, i.e. getState(), correct?

2. your observations are pretty much correct. The reason why RocksDB is slower is that the FsStateBackend basically stores the state in a Java HashMap and writes the contents to HDFS when checkpointing. RocksDB stores data in on-disk files and goes to them for every state access (of course there are caches, but generally it is like this). I'm actually impressed that it is still this fast in comparison.

3. see 1. (I think for now)

4. The checkpointing time is the time from the JobManager deciding to start a checkpoint until all tasks have confirmed that checkpoint. I have seen this before and I think it results from back pressure. The problem is that the checkpoint messages that we sent through the topology are sitting at the sources because they are also back pressured by the slow processing of normal records. You should be able to see the actual checkpointing times (both synchronous and asynchronous) in the log files of the task managers, they should be very much lower.

I can go into details, I'm just writing this quickly before calling it a day. :-)

Cheers,
Aljoscha

On Tue, 12 Apr 2016 at 18:21 Konstantin Knauf <[hidden email]> wrote:
Hi everyone,

my experience with RocksDBStatebackend have left me a little bit
confused. Maybe you guys can confirm that my epxierence is the expected
behaviour ;):

I have run a "performancetest" twice, once with FsStateBackend and once
RocksDBStatebackend in comparison. In this particular test the state
saved is generally not large (in a production scenario it will be larger.)

These are my observations:

1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared
to <<1MB with the FSStatebackend.

2. Throughput dropped from 28k/s -> 18k/s on a small cluster.

3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for
FsStatebackend but >100MB for RocksDbStatebackend. I hope the difference
gets smaller for very large state. Can you confirm?

4. Checkpointing Times as reported in the Dashboard were 26secs for
RocksDB during the test and <1 second for FsStatebackend. Does the
reported time correspond to the sync. + asynchronous part of the
checkpointing in case of RocksDB? Is there any way to tell how long the
synchronous part takes?

Form these first observations RocksDB does seem to bring a large
overhead for state < 1GB, I guess? Is this expected?

Cheers,

Konstantin

Reply | Threaded
Open this post in threaded view
|

Re: RocksDB Statebackend

Maxim
Is it possible to add an option to store the state in the Java HashMap and write its content to RocksDB when checkpointing? For "hot" keys that are updated very frequently such optimization would help with performance.

I know that you are also working on incremental checkpoints which would also be big win for jobs with a large number of keys. 

Thanks,

Maxim.

On Tue, Apr 12, 2016 at 10:39 AM, Stephan Ewen <[hidden email]> wrote:
Concerning the size of RocksDB snapshots - I am wondering if RocksDB simply does not compact for a long time, thus having a lot of stale data in the snapshot.

That would be especially the case, if you have a lot of changing values for the same set of keys.

On Tue, Apr 12, 2016 at 6:41 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
I'm going to try and respond to each point:

1. This seems strange, could you give some background on parallelism, number of operators with state and so on? Also, I'm assuming you are using the partitioned state abstraction, i.e. getState(), correct?

2. your observations are pretty much correct. The reason why RocksDB is slower is that the FsStateBackend basically stores the state in a Java HashMap and writes the contents to HDFS when checkpointing. RocksDB stores data in on-disk files and goes to them for every state access (of course there are caches, but generally it is like this). I'm actually impressed that it is still this fast in comparison.

3. see 1. (I think for now)

4. The checkpointing time is the time from the JobManager deciding to start a checkpoint until all tasks have confirmed that checkpoint. I have seen this before and I think it results from back pressure. The problem is that the checkpoint messages that we sent through the topology are sitting at the sources because they are also back pressured by the slow processing of normal records. You should be able to see the actual checkpointing times (both synchronous and asynchronous) in the log files of the task managers, they should be very much lower.

I can go into details, I'm just writing this quickly before calling it a day. :-)

Cheers,
Aljoscha

On Tue, 12 Apr 2016 at 18:21 Konstantin Knauf <[hidden email]> wrote:
Hi everyone,

my experience with RocksDBStatebackend have left me a little bit
confused. Maybe you guys can confirm that my epxierence is the expected
behaviour ;):

I have run a "performancetest" twice, once with FsStateBackend and once
RocksDBStatebackend in comparison. In this particular test the state
saved is generally not large (in a production scenario it will be larger.)

These are my observations:

1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared
to <<1MB with the FSStatebackend.

2. Throughput dropped from 28k/s -> 18k/s on a small cluster.

3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for
FsStatebackend but >100MB for RocksDbStatebackend. I hope the difference
gets smaller for very large state. Can you confirm?

4. Checkpointing Times as reported in the Dashboard were 26secs for
RocksDB during the test and <1 second for FsStatebackend. Does the
reported time correspond to the sync. + asynchronous part of the
checkpointing in case of RocksDB? Is there any way to tell how long the
synchronous part takes?

Form these first observations RocksDB does seem to bring a large
overhead for state < 1GB, I guess? Is this expected?

Cheers,

Konstantin


Reply | Threaded
Open this post in threaded view
|

Re: RocksDB Statebackend

Aljoscha Krettek
Hi Maxim,
yes the plan is to have a cache of hot values that uses the managed memory abstraction of Flink so that we can make sure that we stay within memory bounds and don't run into OOM exceptions.

On Tue, 12 Apr 2016 at 23:37 Maxim <[hidden email]> wrote:
Is it possible to add an option to store the state in the Java HashMap and write its content to RocksDB when checkpointing? For "hot" keys that are updated very frequently such optimization would help with performance.

I know that you are also working on incremental checkpoints which would also be big win for jobs with a large number of keys. 

Thanks,

Maxim.

On Tue, Apr 12, 2016 at 10:39 AM, Stephan Ewen <[hidden email]> wrote:
Concerning the size of RocksDB snapshots - I am wondering if RocksDB simply does not compact for a long time, thus having a lot of stale data in the snapshot.

That would be especially the case, if you have a lot of changing values for the same set of keys.

On Tue, Apr 12, 2016 at 6:41 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
I'm going to try and respond to each point:

1. This seems strange, could you give some background on parallelism, number of operators with state and so on? Also, I'm assuming you are using the partitioned state abstraction, i.e. getState(), correct?

2. your observations are pretty much correct. The reason why RocksDB is slower is that the FsStateBackend basically stores the state in a Java HashMap and writes the contents to HDFS when checkpointing. RocksDB stores data in on-disk files and goes to them for every state access (of course there are caches, but generally it is like this). I'm actually impressed that it is still this fast in comparison.

3. see 1. (I think for now)

4. The checkpointing time is the time from the JobManager deciding to start a checkpoint until all tasks have confirmed that checkpoint. I have seen this before and I think it results from back pressure. The problem is that the checkpoint messages that we sent through the topology are sitting at the sources because they are also back pressured by the slow processing of normal records. You should be able to see the actual checkpointing times (both synchronous and asynchronous) in the log files of the task managers, they should be very much lower.

I can go into details, I'm just writing this quickly before calling it a day. :-)

Cheers,
Aljoscha

On Tue, 12 Apr 2016 at 18:21 Konstantin Knauf <[hidden email]> wrote:
Hi everyone,

my experience with RocksDBStatebackend have left me a little bit
confused. Maybe you guys can confirm that my epxierence is the expected
behaviour ;):

I have run a "performancetest" twice, once with FsStateBackend and once
RocksDBStatebackend in comparison. In this particular test the state
saved is generally not large (in a production scenario it will be larger.)

These are my observations:

1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared
to <<1MB with the FSStatebackend.

2. Throughput dropped from 28k/s -> 18k/s on a small cluster.

3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for
FsStatebackend but >100MB for RocksDbStatebackend. I hope the difference
gets smaller for very large state. Can you confirm?

4. Checkpointing Times as reported in the Dashboard were 26secs for
RocksDB during the test and <1 second for FsStatebackend. Does the
reported time correspond to the sync. + asynchronous part of the
checkpointing in case of RocksDB? Is there any way to tell how long the
synchronous part takes?

Form these first observations RocksDB does seem to bring a large
overhead for state < 1GB, I guess? Is this expected?

Cheers,

Konstantin


Reply | Threaded
Open this post in threaded view
|

Re: RocksDB Statebackend

Shannon Carey
This is something that my team and I have discussed building, so it's great to know that it's already on the radar. If we beat you to it, I'll definitely try to make it a contribution.

Shannon

From: Aljoscha Krettek <[hidden email]>
Date: Wednesday, April 13, 2016 at 1:46 PM
To: <[hidden email]>
Subject: Re: RocksDB Statebackend

Hi Maxim,
yes the plan is to have a cache of hot values that uses the managed memory abstraction of Flink so that we can make sure that we stay within memory bounds and don't run into OOM exceptions.

On Tue, 12 Apr 2016 at 23:37 Maxim <[hidden email]> wrote:
Is it possible to add an option to store the state in the Java HashMap and write its content to RocksDB when checkpointing? For "hot" keys that are updated very frequently such optimization would help with performance.

I know that you are also working on incremental checkpoints which would also be big win for jobs with a large number of keys. 

Thanks,

Maxim.

On Tue, Apr 12, 2016 at 10:39 AM, Stephan Ewen <[hidden email]> wrote:
Concerning the size of RocksDB snapshots - I am wondering if RocksDB simply does not compact for a long time, thus having a lot of stale data in the snapshot.

That would be especially the case, if you have a lot of changing values for the same set of keys.

On Tue, Apr 12, 2016 at 6:41 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
I'm going to try and respond to each point:

1. This seems strange, could you give some background on parallelism, number of operators with state and so on? Also, I'm assuming you are using the partitioned state abstraction, i.e. getState(), correct?

2. your observations are pretty much correct. The reason why RocksDB is slower is that the FsStateBackend basically stores the state in a Java HashMap and writes the contents to HDFS when checkpointing. RocksDB stores data in on-disk files and goes to them for every state access (of course there are caches, but generally it is like this). I'm actually impressed that it is still this fast in comparison.

3. see 1. (I think for now)

4. The checkpointing time is the time from the JobManager deciding to start a checkpoint until all tasks have confirmed that checkpoint. I have seen this before and I think it results from back pressure. The problem is that the checkpoint messages that we sent through the topology are sitting at the sources because they are also back pressured by the slow processing of normal records. You should be able to see the actual checkpointing times (both synchronous and asynchronous) in the log files of the task managers, they should be very much lower.

I can go into details, I'm just writing this quickly before calling it a day. :-)

Cheers,
Aljoscha

On Tue, 12 Apr 2016 at 18:21 Konstantin Knauf <[hidden email]> wrote:
Hi everyone,

my experience with RocksDBStatebackend have left me a little bit
confused. Maybe you guys can confirm that my epxierence is the expected
behaviour ;):

I have run a "performancetest" twice, once with FsStateBackend and once
RocksDBStatebackend in comparison. In this particular test the state
saved is generally not large (in a production scenario it will be larger.)

These are my observations:

1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared
to <<1MB with the FSStatebackend.

2. Throughput dropped from 28k/s -> 18k/s on a small cluster.

3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for
FsStatebackend but >100MB for RocksDbStatebackend. I hope the difference
gets smaller for very large state. Can you confirm?

4. Checkpointing Times as reported in the Dashboard were 26secs for
RocksDB during the test and <1 second for FsStatebackend. Does the
reported time correspond to the sync. + asynchronous part of the
checkpointing in case of RocksDB? Is there any way to tell how long the
synchronous part takes?

Form these first observations RocksDB does seem to bring a large
overhead for state < 1GB, I guess? Is this expected?

Cheers,

Konstantin


Reply | Threaded
Open this post in threaded view
|

Re: RocksDB Statebackend

Aljoscha Krettek
That's interesting to hear. If you want we can also collaborate on that one. Using the Flink managed memory for that purpose would require some changes to lower layers of Flink.

On Wed, 13 Apr 2016 at 13:11 Shannon Carey <[hidden email]> wrote:
This is something that my team and I have discussed building, so it's great to know that it's already on the radar. If we beat you to it, I'll definitely try to make it a contribution.

Shannon

From: Aljoscha Krettek <[hidden email]>
Date: Wednesday, April 13, 2016 at 1:46 PM
To: <[hidden email]>
Subject: Re: RocksDB Statebackend

Hi Maxim,
yes the plan is to have a cache of hot values that uses the managed memory abstraction of Flink so that we can make sure that we stay within memory bounds and don't run into OOM exceptions.

On Tue, 12 Apr 2016 at 23:37 Maxim <[hidden email]> wrote:
Is it possible to add an option to store the state in the Java HashMap and write its content to RocksDB when checkpointing? For "hot" keys that are updated very frequently such optimization would help with performance.

I know that you are also working on incremental checkpoints which would also be big win for jobs with a large number of keys. 

Thanks,

Maxim.

On Tue, Apr 12, 2016 at 10:39 AM, Stephan Ewen <[hidden email]> wrote:
Concerning the size of RocksDB snapshots - I am wondering if RocksDB simply does not compact for a long time, thus having a lot of stale data in the snapshot.

That would be especially the case, if you have a lot of changing values for the same set of keys.

On Tue, Apr 12, 2016 at 6:41 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
I'm going to try and respond to each point:

1. This seems strange, could you give some background on parallelism, number of operators with state and so on? Also, I'm assuming you are using the partitioned state abstraction, i.e. getState(), correct?

2. your observations are pretty much correct. The reason why RocksDB is slower is that the FsStateBackend basically stores the state in a Java HashMap and writes the contents to HDFS when checkpointing. RocksDB stores data in on-disk files and goes to them for every state access (of course there are caches, but generally it is like this). I'm actually impressed that it is still this fast in comparison.

3. see 1. (I think for now)

4. The checkpointing time is the time from the JobManager deciding to start a checkpoint until all tasks have confirmed that checkpoint. I have seen this before and I think it results from back pressure. The problem is that the checkpoint messages that we sent through the topology are sitting at the sources because they are also back pressured by the slow processing of normal records. You should be able to see the actual checkpointing times (both synchronous and asynchronous) in the log files of the task managers, they should be very much lower.

I can go into details, I'm just writing this quickly before calling it a day. :-)

Cheers,
Aljoscha

On Tue, 12 Apr 2016 at 18:21 Konstantin Knauf <[hidden email]> wrote:
Hi everyone,

my experience with RocksDBStatebackend have left me a little bit
confused. Maybe you guys can confirm that my epxierence is the expected
behaviour ;):

I have run a "performancetest" twice, once with FsStateBackend and once
RocksDBStatebackend in comparison. In this particular test the state
saved is generally not large (in a production scenario it will be larger.)

These are my observations:

1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared
to <<1MB with the FSStatebackend.

2. Throughput dropped from 28k/s -> 18k/s on a small cluster.

3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for
FsStatebackend but >100MB for RocksDbStatebackend. I hope the difference
gets smaller for very large state. Can you confirm?

4. Checkpointing Times as reported in the Dashboard were 26secs for
RocksDB during the test and <1 second for FsStatebackend. Does the
reported time correspond to the sync. + asynchronous part of the
checkpointing in case of RocksDB? Is there any way to tell how long the
synchronous part takes?

Form these first observations RocksDB does seem to bring a large
overhead for state < 1GB, I guess? Is this expected?

Cheers,

Konstantin


Reply | Threaded
Open this post in threaded view
|

Re: RocksDB Statebackend

snntr
In reply to this post by Aljoscha Krettek
Hi Aljoscha,

thanks for your answers. I am currently not in the office, so I can not
run any further analysis until Monday. Just some quick answers to your
questions.

We are using the partitioned state abstraction, most of the state should
correspond to buffered events in windows. Parallelism is 9. In terms of
stateful operators we basically just have a KafkaSource, a custom
stateful trigger as well as a RollingSink. Overall in this test scenario
the state is very limited (see size of state using FsStateBackend).

I will get back to you once, I have done some more experiments, which
will be in the course of next week.

Cheers,

Konstantin


On 12.04.2016 18:41, Aljoscha Krettek wrote:

> Hi,
> I'm going to try and respond to each point:
>
> 1. This seems strange, could you give some background on parallelism,
> number of operators with state and so on? Also, I'm assuming you are
> using the partitioned state abstraction, i.e. getState(), correct?
>
> 2. your observations are pretty much correct. The reason why RocksDB is
> slower is that the FsStateBackend basically stores the state in a Java
> HashMap and writes the contents to HDFS when checkpointing. RocksDB
> stores data in on-disk files and goes to them for every state access (of
> course there are caches, but generally it is like this). I'm actually
> impressed that it is still this fast in comparison.
>
> 3. see 1. (I think for now)
>
> 4. The checkpointing time is the time from the JobManager deciding to
> start a checkpoint until all tasks have confirmed that checkpoint. I
> have seen this before and I think it results from back pressure. The
> problem is that the checkpoint messages that we sent through the
> topology are sitting at the sources because they are also back pressured
> by the slow processing of normal records. You should be able to see the
> actual checkpointing times (both synchronous and asynchronous) in the
> log files of the task managers, they should be very much lower.
>
> I can go into details, I'm just writing this quickly before calling it a
> day. :-)
>
> Cheers,
> Aljoscha
>
> On Tue, 12 Apr 2016 at 18:21 Konstantin Knauf
> <[hidden email] <mailto:[hidden email]>> wrote:
>
>     Hi everyone,
>
>     my experience with RocksDBStatebackend have left me a little bit
>     confused. Maybe you guys can confirm that my epxierence is the expected
>     behaviour ;):
>
>     I have run a "performancetest" twice, once with FsStateBackend and once
>     RocksDBStatebackend in comparison. In this particular test the state
>     saved is generally not large (in a production scenario it will be
>     larger.)
>
>     These are my observations:
>
>     1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared
>     to <<1MB with the FSStatebackend.
>
>     2. Throughput dropped from 28k/s -> 18k/s on a small cluster.
>
>     3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for
>     FsStatebackend but >100MB for RocksDbStatebackend. I hope the difference
>     gets smaller for very large state. Can you confirm?
>
>     4. Checkpointing Times as reported in the Dashboard were 26secs for
>     RocksDB during the test and <1 second for FsStatebackend. Does the
>     reported time correspond to the sync. + asynchronous part of the
>     checkpointing in case of RocksDB? Is there any way to tell how long the
>     synchronous part takes?
>
>     Form these first observations RocksDB does seem to bring a large
>     overhead for state < 1GB, I guess? Is this expected?
>
>     Cheers,
>
>     Konstantin
>

--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082