RocksDB savepoint recovery performance improvements

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

RocksDB savepoint recovery performance improvements

Joey Pereira
Hey,

While running a Flink application with a large-state, savepoint recovery has been a painful part of operating the application because recovery time can be several hours. During some profiling that chohan (cc'd) had done, a red flag stood out — savepoint recovery consisted mostly of RocksDB Get and Put operations.

When Flink is bootstrapping state for RocksDB instances this is not what I would have expected, as RocksDB supports direct ingestion of the on-disk format (SSTables): https://github.com/facebook/rocksdb/wiki/Creating-and-Ingesting-SST-files. This was also recently reported on Jira: https://issues.apache.org/jira/browse/FLINK-17288.

From what I understood of the current implementation:

* The snapshot restoration pathways, RocksDBFullRestoreOperation and RocksDBIncrementalRestoreOperation, use RocksDBWriteBatchWrapper.

RocksDBWriteBatchWrapper is using RocksDB’s WriteBatch operator. This will provide atomicity of batches as well as performance benefits for batching, compared to individual Puts, but it will still involve RocksDB’s insert paths which can involve expensive operations[0].

Instead, by creating SSTable files and instructing RocksDB to ingest the files, writes can be batched even further and avoid expensive operations in RocksDB. This is commonly utilized by other systems for restoration or import processes, such as in CockroachDB[1], TiDB[2], and Percona[3].  There are some restrictions on being able to generate SSTables, as well as limitations for ingestion to be performant. Unfortunately, it’s all not very well documented:

1. When generating an SSTable, keys need to be inserted in-order.

2. Ingested files should not have key-ranges that overlap with either existing or other ingested files[4]. It is possible to ingest overlapping SSTables, but this may incur significant overhead.

To generate SSTables with non-overlapping key-ranges and to create them with keys in-order, it would mean that the savepoints would need to be ordered while processing them. I'm unsure if this is the case for how Flink's savepoints are stored. 

I have not dug into RocksDBIncrementalRestoreOperation yet, or how it is used (eg: for incremental checkpoint or something else). I did notice it is iterating over a temporary RocksDB instance and inserting into a "final instance. These writes could be optimized in a similar manner. Alternatively, it could be possible to use the temporary instance's SSTables, ingest them, and prune data out with RocksDB's DeleteRange.

To get started with prototyping, I was thinking of taking a simple approach of making an interface for RocksDBWriteBatchWrapper and swapping the implementation for one that does SSTable generation and ingestion. I reckon that will be an easy way to sanity check whether it works at all.

I was also planning on writing some benchmarks in RocksDB to understand the difference for ingestion scenarios, as RocksDB itself is sparse on details about SSTable ingestion[4] and does not have benchmarking for ingestion.

Does all of that seem sound? I'll report back when I get time to work out that implementation and tests, likely during the coming weekend.


Joey

---

[0]: I don’t have any specific sources on this. At a high-level, some of the operations happening during writes include writing to the memtable before flushing to an SSTable and doing merging and/or compaction. In general, these will add write-amplification and overall overhead to bulk insertion. These can largely be avoided by giving RocksDB SSTables, especially if they have non-overlapping key-ranges.  "Characterizing, Modeling, and Benchmarking RocksDB Key-Value Workloads at Facebook" (https://www.usenix.org/system/files/fast20-cao_zhichao.pdf) is a helpful source that highlights what happens during various workloads.


[1]: CockroachDB is a database that uses RocksDB as the on-disk storage. Their implementation consolidates bulk ingestion to an AddSSTable command. Primarily, they have some choice of options for SSTable generation and ingestion that are of interest:




[2]: TiDB, and TiKV which is the KV layer of the database, uses RocksDB as the on-disk storage. Their implementation of bulk ingestion is contained within:  https://github.com/tikv/tikv/blob/master/components/sst_importer/src/sst_importer.rs

Other useful references:
- https://github.com/tikv/tikv/issues/2404, discussing performance for copy vs. move options.


[3]: Percona is a SQL database which supports a RocksDB backend. Their implementation of ingestion can be found here: https://github.com/percona/percona-server/blob/a259dc92e76e1569bc5ed34993cc7fc6af43832a/storage/rocksdb/ha_rocksdb.cc#L2815


[4]: Again, there is not a lot of official resources on this. Notable references I found on this include:

https://github.com/facebook/rocksdb/issues/2473, which describes at a high-level how re-insertions work. 

https://github.com/facebook/rocksdb/issues/3540, which describes the performance costs for ingesting overlapping SSTables, and specific benchmarks (post-fix) here: https://github.com/facebook/rocksdb/pull/3564

https://github.com/facebook/rocksdb/pull/3179, which describes the mechanism for ingesting SSTable files: there need to be point-key overlap checks for the LSM.

https://github.com/facebook/rocksdb/issues/5133, indicates re-ingesting the same SSTable (due to restarts in import processes), can cause issues for a particular set of options.

https://github.com/facebook/rocksdb/issues/5770#issuecomment-528488245, indicates compaction occurs more (or, only) when overlapping SSTables are ingested. The thinking here is non-overlapping SSTable ingestion means very few operations (compaction, merging, etc) occur afterward, with the right tuning for generation and ingestion.

https://github.com/facebook/rocksdb/issues/5010, which discusses some unresolved issues for high CPU overhead on ingestion.
Reply | Threaded
Open this post in threaded view
|

Re: RocksDB savepoint recovery performance improvements

Yun Tang
Hi Joey

Previously, I also looked at the mechanism to create on-disk SSTables as I planed to use RocksDB's benchmark to mock scenario in Flink. However, I found the main challenge is how to ensure the keys are inserted in a strictly increasing order. The key order in java could differ from the bytes order in RocksDB. In your case, I think it could be much easier as RocksFullSnapshotStrategy write data per columnfamily per key group which should be in a strictly increasing order [1].

FLINK-17288 could mitigate the performance and your solution could help improve the performance much better (and could integrate with state-processor-api story).

On the other hand, for out-of-box to use in production for your scenario, how about using checkpoint to recover, as it also supports rescale and normal recover.



Best
Yun Tang

From: Joey Pereira <[hidden email]>
Sent: Tuesday, May 19, 2020 2:27
To: [hidden email] <[hidden email]>
Cc: Mike Mintz <[hidden email]>; Shahid Chohan <[hidden email]>; Aaron Levin <[hidden email]>
Subject: RocksDB savepoint recovery performance improvements
 
Hey,

While running a Flink application with a large-state, savepoint recovery has been a painful part of operating the application because recovery time can be several hours. During some profiling that chohan (cc'd) had done, a red flag stood out — savepoint recovery consisted mostly of RocksDB Get and Put operations.

When Flink is bootstrapping state for RocksDB instances this is not what I would have expected, as RocksDB supports direct ingestion of the on-disk format (SSTables): https://github.com/facebook/rocksdb/wiki/Creating-and-Ingesting-SST-files. This was also recently reported on Jira: https://issues.apache.org/jira/browse/FLINK-17288.

From what I understood of the current implementation:

* The snapshot restoration pathways, RocksDBFullRestoreOperation and RocksDBIncrementalRestoreOperation, use RocksDBWriteBatchWrapper.

RocksDBWriteBatchWrapper is using RocksDB’s WriteBatch operator. This will provide atomicity of batches as well as performance benefits for batching, compared to individual Puts, but it will still involve RocksDB’s insert paths which can involve expensive operations[0].

Instead, by creating SSTable files and instructing RocksDB to ingest the files, writes can be batched even further and avoid expensive operations in RocksDB. This is commonly utilized by other systems for restoration or import processes, such as in CockroachDB[1], TiDB[2], and Percona[3].  There are some restrictions on being able to generate SSTables, as well as limitations for ingestion to be performant. Unfortunately, it’s all not very well documented:

1. When generating an SSTable, keys need to be inserted in-order.

2. Ingested files should not have key-ranges that overlap with either existing or other ingested files[4]. It is possible to ingest overlapping SSTables, but this may incur significant overhead.

To generate SSTables with non-overlapping key-ranges and to create them with keys in-order, it would mean that the savepoints would need to be ordered while processing them. I'm unsure if this is the case for how Flink's savepoints are stored. 

I have not dug into RocksDBIncrementalRestoreOperation yet, or how it is used (eg: for incremental checkpoint or something else). I did notice it is iterating over a temporary RocksDB instance and inserting into a "final instance. These writes could be optimized in a similar manner. Alternatively, it could be possible to use the temporary instance's SSTables, ingest them, and prune data out with RocksDB's DeleteRange.

To get started with prototyping, I was thinking of taking a simple approach of making an interface for RocksDBWriteBatchWrapper and swapping the implementation for one that does SSTable generation and ingestion. I reckon that will be an easy way to sanity check whether it works at all.

I was also planning on writing some benchmarks in RocksDB to understand the difference for ingestion scenarios, as RocksDB itself is sparse on details about SSTable ingestion[4] and does not have benchmarking for ingestion.

Does all of that seem sound? I'll report back when I get time to work out that implementation and tests, likely during the coming weekend.


Joey

---

[0]: I don’t have any specific sources on this. At a high-level, some of the operations happening during writes include writing to the memtable before flushing to an SSTable and doing merging and/or compaction. In general, these will add write-amplification and overall overhead to bulk insertion. These can largely be avoided by giving RocksDB SSTables, especially if they have non-overlapping key-ranges.  "Characterizing, Modeling, and Benchmarking RocksDB Key-Value Workloads at Facebook" (https://www.usenix.org/system/files/fast20-cao_zhichao.pdf) is a helpful source that highlights what happens during various workloads.


[1]: CockroachDB is a database that uses RocksDB as the on-disk storage. Their implementation consolidates bulk ingestion to an AddSSTable command. Primarily, they have some choice of options for SSTable generation and ingestion that are of interest:




[2]: TiDB, and TiKV which is the KV layer of the database, uses RocksDB as the on-disk storage. Their implementation of bulk ingestion is contained within:  https://github.com/tikv/tikv/blob/master/components/sst_importer/src/sst_importer.rs

Other useful references:
- https://github.com/tikv/tikv/issues/2404, discussing performance for copy vs. move options.


[3]: Percona is a SQL database which supports a RocksDB backend. Their implementation of ingestion can be found here: https://github.com/percona/percona-server/blob/a259dc92e76e1569bc5ed34993cc7fc6af43832a/storage/rocksdb/ha_rocksdb.cc#L2815


[4]: Again, there is not a lot of official resources on this. Notable references I found on this include:

https://github.com/facebook/rocksdb/issues/2473, which describes at a high-level how re-insertions work. 

https://github.com/facebook/rocksdb/issues/3540, which describes the performance costs for ingesting overlapping SSTables, and specific benchmarks (post-fix) here: https://github.com/facebook/rocksdb/pull/3564

https://github.com/facebook/rocksdb/pull/3179, which describes the mechanism for ingesting SSTable files: there need to be point-key overlap checks for the LSM.

https://github.com/facebook/rocksdb/issues/5133, indicates re-ingesting the same SSTable (due to restarts in import processes), can cause issues for a particular set of options.

https://github.com/facebook/rocksdb/issues/5770#issuecomment-528488245, indicates compaction occurs more (or, only) when overlapping SSTables are ingested. The thinking here is non-overlapping SSTable ingestion means very few operations (compaction, merging, etc) occur afterward, with the right tuning for generation and ingestion.

https://github.com/facebook/rocksdb/issues/5010, which discusses some unresolved issues for high CPU overhead on ingestion.
Reply | Threaded
Open this post in threaded view
|

Re: RocksDB savepoint recovery performance improvements

Joey Pereira
Thanks Yun for highlighting this, it's very helpful! I'll give it a go with that in mind.

We have already begun using checkpoints for recovery. Having these improvements would still be immensely helpful to reduce downtime for savepoint recovery.

On Mon, May 18, 2020 at 3:14 PM Yun Tang <[hidden email]> wrote:
Hi Joey

Previously, I also looked at the mechanism to create on-disk SSTables as I planed to use RocksDB's benchmark to mock scenario in Flink. However, I found the main challenge is how to ensure the keys are inserted in a strictly increasing order. The key order in java could differ from the bytes order in RocksDB. In your case, I think it could be much easier as RocksFullSnapshotStrategy write data per columnfamily per key group which should be in a strictly increasing order [1].

FLINK-17288 could mitigate the performance and your solution could help improve the performance much better (and could integrate with state-processor-api story).

On the other hand, for out-of-box to use in production for your scenario, how about using checkpoint to recover, as it also supports rescale and normal recover.



Best
Yun Tang

From: Joey Pereira <[hidden email]>
Sent: Tuesday, May 19, 2020 2:27
To: [hidden email] <[hidden email]>
Cc: Mike Mintz <[hidden email]>; Shahid Chohan <[hidden email]>; Aaron Levin <[hidden email]>
Subject: RocksDB savepoint recovery performance improvements
 
Hey,

While running a Flink application with a large-state, savepoint recovery has been a painful part of operating the application because recovery time can be several hours. During some profiling that chohan (cc'd) had done, a red flag stood out — savepoint recovery consisted mostly of RocksDB Get and Put operations.

When Flink is bootstrapping state for RocksDB instances this is not what I would have expected, as RocksDB supports direct ingestion of the on-disk format (SSTables): https://github.com/facebook/rocksdb/wiki/Creating-and-Ingesting-SST-files. This was also recently reported on Jira: https://issues.apache.org/jira/browse/FLINK-17288.

From what I understood of the current implementation:

* The snapshot restoration pathways, RocksDBFullRestoreOperation and RocksDBIncrementalRestoreOperation, use RocksDBWriteBatchWrapper.

RocksDBWriteBatchWrapper is using RocksDB’s WriteBatch operator. This will provide atomicity of batches as well as performance benefits for batching, compared to individual Puts, but it will still involve RocksDB’s insert paths which can involve expensive operations[0].

Instead, by creating SSTable files and instructing RocksDB to ingest the files, writes can be batched even further and avoid expensive operations in RocksDB. This is commonly utilized by other systems for restoration or import processes, such as in CockroachDB[1], TiDB[2], and Percona[3].  There are some restrictions on being able to generate SSTables, as well as limitations for ingestion to be performant. Unfortunately, it’s all not very well documented:

1. When generating an SSTable, keys need to be inserted in-order.

2. Ingested files should not have key-ranges that overlap with either existing or other ingested files[4]. It is possible to ingest overlapping SSTables, but this may incur significant overhead.

To generate SSTables with non-overlapping key-ranges and to create them with keys in-order, it would mean that the savepoints would need to be ordered while processing them. I'm unsure if this is the case for how Flink's savepoints are stored. 

I have not dug into RocksDBIncrementalRestoreOperation yet, or how it is used (eg: for incremental checkpoint or something else). I did notice it is iterating over a temporary RocksDB instance and inserting into a "final instance. These writes could be optimized in a similar manner. Alternatively, it could be possible to use the temporary instance's SSTables, ingest them, and prune data out with RocksDB's DeleteRange.

To get started with prototyping, I was thinking of taking a simple approach of making an interface for RocksDBWriteBatchWrapper and swapping the implementation for one that does SSTable generation and ingestion. I reckon that will be an easy way to sanity check whether it works at all.

I was also planning on writing some benchmarks in RocksDB to understand the difference for ingestion scenarios, as RocksDB itself is sparse on details about SSTable ingestion[4] and does not have benchmarking for ingestion.

Does all of that seem sound? I'll report back when I get time to work out that implementation and tests, likely during the coming weekend.


Joey

---

[0]: I don’t have any specific sources on this. At a high-level, some of the operations happening during writes include writing to the memtable before flushing to an SSTable and doing merging and/or compaction. In general, these will add write-amplification and overall overhead to bulk insertion. These can largely be avoided by giving RocksDB SSTables, especially if they have non-overlapping key-ranges.  "Characterizing, Modeling, and Benchmarking RocksDB Key-Value Workloads at Facebook" (https://www.usenix.org/system/files/fast20-cao_zhichao.pdf) is a helpful source that highlights what happens during various workloads.


[1]: CockroachDB is a database that uses RocksDB as the on-disk storage. Their implementation consolidates bulk ingestion to an AddSSTable command. Primarily, they have some choice of options for SSTable generation and ingestion that are of interest:




[2]: TiDB, and TiKV which is the KV layer of the database, uses RocksDB as the on-disk storage. Their implementation of bulk ingestion is contained within:  https://github.com/tikv/tikv/blob/master/components/sst_importer/src/sst_importer.rs

Other useful references:
- https://github.com/tikv/tikv/issues/2404, discussing performance for copy vs. move options.


[3]: Percona is a SQL database which supports a RocksDB backend. Their implementation of ingestion can be found here: https://github.com/percona/percona-server/blob/a259dc92e76e1569bc5ed34993cc7fc6af43832a/storage/rocksdb/ha_rocksdb.cc#L2815


[4]: Again, there is not a lot of official resources on this. Notable references I found on this include:

https://github.com/facebook/rocksdb/issues/2473, which describes at a high-level how re-insertions work. 

https://github.com/facebook/rocksdb/issues/3540, which describes the performance costs for ingesting overlapping SSTables, and specific benchmarks (post-fix) here: https://github.com/facebook/rocksdb/pull/3564

https://github.com/facebook/rocksdb/pull/3179, which describes the mechanism for ingesting SSTable files: there need to be point-key overlap checks for the LSM.

https://github.com/facebook/rocksdb/issues/5133, indicates re-ingesting the same SSTable (due to restarts in import processes), can cause issues for a particular set of options.

https://github.com/facebook/rocksdb/issues/5770#issuecomment-528488245, indicates compaction occurs more (or, only) when overlapping SSTables are ingested. The thinking here is non-overlapping SSTable ingestion means very few operations (compaction, merging, etc) occur afterward, with the right tuning for generation and ingestion.

https://github.com/facebook/rocksdb/issues/5010, which discusses some unresolved issues for high CPU overhead on ingestion.
Reply | Threaded
Open this post in threaded view
|

Re: RocksDB savepoint recovery performance improvements

Joey Pereira
Following up: I've put together the implementation, https://github.com/apache/flink/pull/12345. It's passing tests but is only partially complete, as it still needs some clean-up and configuration. I still need to try running this against a production cluster to check the performance, as well as getting some RocksDB benchmarks.

On Mon, May 18, 2020 at 3:46 PM Joey Pereira <[hidden email]> wrote:
Thanks Yun for highlighting this, it's very helpful! I'll give it a go with that in mind.

We have already begun using checkpoints for recovery. Having these improvements would still be immensely helpful to reduce downtime for savepoint recovery.

On Mon, May 18, 2020 at 3:14 PM Yun Tang <[hidden email]> wrote:
Hi Joey

Previously, I also looked at the mechanism to create on-disk SSTables as I planed to use RocksDB's benchmark to mock scenario in Flink. However, I found the main challenge is how to ensure the keys are inserted in a strictly increasing order. The key order in java could differ from the bytes order in RocksDB. In your case, I think it could be much easier as RocksFullSnapshotStrategy write data per columnfamily per key group which should be in a strictly increasing order [1].

FLINK-17288 could mitigate the performance and your solution could help improve the performance much better (and could integrate with state-processor-api story).

On the other hand, for out-of-box to use in production for your scenario, how about using checkpoint to recover, as it also supports rescale and normal recover.



Best
Yun Tang

From: Joey Pereira <[hidden email]>
Sent: Tuesday, May 19, 2020 2:27
To: [hidden email] <[hidden email]>
Cc: Mike Mintz <[hidden email]>; Shahid Chohan <[hidden email]>; Aaron Levin <[hidden email]>
Subject: RocksDB savepoint recovery performance improvements
 
Hey,

While running a Flink application with a large-state, savepoint recovery has been a painful part of operating the application because recovery time can be several hours. During some profiling that chohan (cc'd) had done, a red flag stood out — savepoint recovery consisted mostly of RocksDB Get and Put operations.

When Flink is bootstrapping state for RocksDB instances this is not what I would have expected, as RocksDB supports direct ingestion of the on-disk format (SSTables): https://github.com/facebook/rocksdb/wiki/Creating-and-Ingesting-SST-files. This was also recently reported on Jira: https://issues.apache.org/jira/browse/FLINK-17288.

From what I understood of the current implementation:

* The snapshot restoration pathways, RocksDBFullRestoreOperation and RocksDBIncrementalRestoreOperation, use RocksDBWriteBatchWrapper.

RocksDBWriteBatchWrapper is using RocksDB’s WriteBatch operator. This will provide atomicity of batches as well as performance benefits for batching, compared to individual Puts, but it will still involve RocksDB’s insert paths which can involve expensive operations[0].

Instead, by creating SSTable files and instructing RocksDB to ingest the files, writes can be batched even further and avoid expensive operations in RocksDB. This is commonly utilized by other systems for restoration or import processes, such as in CockroachDB[1], TiDB[2], and Percona[3].  There are some restrictions on being able to generate SSTables, as well as limitations for ingestion to be performant. Unfortunately, it’s all not very well documented:

1. When generating an SSTable, keys need to be inserted in-order.

2. Ingested files should not have key-ranges that overlap with either existing or other ingested files[4]. It is possible to ingest overlapping SSTables, but this may incur significant overhead.

To generate SSTables with non-overlapping key-ranges and to create them with keys in-order, it would mean that the savepoints would need to be ordered while processing them. I'm unsure if this is the case for how Flink's savepoints are stored. 

I have not dug into RocksDBIncrementalRestoreOperation yet, or how it is used (eg: for incremental checkpoint or something else). I did notice it is iterating over a temporary RocksDB instance and inserting into a "final instance. These writes could be optimized in a similar manner. Alternatively, it could be possible to use the temporary instance's SSTables, ingest them, and prune data out with RocksDB's DeleteRange.

To get started with prototyping, I was thinking of taking a simple approach of making an interface for RocksDBWriteBatchWrapper and swapping the implementation for one that does SSTable generation and ingestion. I reckon that will be an easy way to sanity check whether it works at all.

I was also planning on writing some benchmarks in RocksDB to understand the difference for ingestion scenarios, as RocksDB itself is sparse on details about SSTable ingestion[4] and does not have benchmarking for ingestion.

Does all of that seem sound? I'll report back when I get time to work out that implementation and tests, likely during the coming weekend.


Joey

---

[0]: I don’t have any specific sources on this. At a high-level, some of the operations happening during writes include writing to the memtable before flushing to an SSTable and doing merging and/or compaction. In general, these will add write-amplification and overall overhead to bulk insertion. These can largely be avoided by giving RocksDB SSTables, especially if they have non-overlapping key-ranges.  "Characterizing, Modeling, and Benchmarking RocksDB Key-Value Workloads at Facebook" (https://www.usenix.org/system/files/fast20-cao_zhichao.pdf) is a helpful source that highlights what happens during various workloads.


[1]: CockroachDB is a database that uses RocksDB as the on-disk storage. Their implementation consolidates bulk ingestion to an AddSSTable command. Primarily, they have some choice of options for SSTable generation and ingestion that are of interest:




[2]: TiDB, and TiKV which is the KV layer of the database, uses RocksDB as the on-disk storage. Their implementation of bulk ingestion is contained within:  https://github.com/tikv/tikv/blob/master/components/sst_importer/src/sst_importer.rs

Other useful references:
- https://github.com/tikv/tikv/issues/2404, discussing performance for copy vs. move options.


[3]: Percona is a SQL database which supports a RocksDB backend. Their implementation of ingestion can be found here: https://github.com/percona/percona-server/blob/a259dc92e76e1569bc5ed34993cc7fc6af43832a/storage/rocksdb/ha_rocksdb.cc#L2815


[4]: Again, there is not a lot of official resources on this. Notable references I found on this include:

https://github.com/facebook/rocksdb/issues/2473, which describes at a high-level how re-insertions work. 

https://github.com/facebook/rocksdb/issues/3540, which describes the performance costs for ingesting overlapping SSTables, and specific benchmarks (post-fix) here: https://github.com/facebook/rocksdb/pull/3564

https://github.com/facebook/rocksdb/pull/3179, which describes the mechanism for ingesting SSTable files: there need to be point-key overlap checks for the LSM.

https://github.com/facebook/rocksdb/issues/5133, indicates re-ingesting the same SSTable (due to restarts in import processes), can cause issues for a particular set of options.

https://github.com/facebook/rocksdb/issues/5770#issuecomment-528488245, indicates compaction occurs more (or, only) when overlapping SSTables are ingested. The thinking here is non-overlapping SSTable ingestion means very few operations (compaction, merging, etc) occur afterward, with the right tuning for generation and ingestion.

https://github.com/facebook/rocksdb/issues/5010, which discusses some unresolved issues for high CPU overhead on ingestion.
Reply | Threaded
Open this post in threaded view
|

Re: RocksDB savepoint recovery performance improvements

Steven Wu
Yun, you mentioned that checkpoint also supports rescale. I thought the recommendation [1] is to use savepoint for rescale. 


On Tue, May 26, 2020 at 6:46 AM Joey Pereira <[hidden email]> wrote:
Following up: I've put together the implementation, https://github.com/apache/flink/pull/12345. It's passing tests but is only partially complete, as it still needs some clean-up and configuration. I still need to try running this against a production cluster to check the performance, as well as getting some RocksDB benchmarks.

On Mon, May 18, 2020 at 3:46 PM Joey Pereira <[hidden email]> wrote:
Thanks Yun for highlighting this, it's very helpful! I'll give it a go with that in mind.

We have already begun using checkpoints for recovery. Having these improvements would still be immensely helpful to reduce downtime for savepoint recovery.

On Mon, May 18, 2020 at 3:14 PM Yun Tang <[hidden email]> wrote:
Hi Joey

Previously, I also looked at the mechanism to create on-disk SSTables as I planed to use RocksDB's benchmark to mock scenario in Flink. However, I found the main challenge is how to ensure the keys are inserted in a strictly increasing order. The key order in java could differ from the bytes order in RocksDB. In your case, I think it could be much easier as RocksFullSnapshotStrategy write data per columnfamily per key group which should be in a strictly increasing order [1].

FLINK-17288 could mitigate the performance and your solution could help improve the performance much better (and could integrate with state-processor-api story).

On the other hand, for out-of-box to use in production for your scenario, how about using checkpoint to recover, as it also supports rescale and normal recover.



Best
Yun Tang

From: Joey Pereira <[hidden email]>
Sent: Tuesday, May 19, 2020 2:27
To: [hidden email] <[hidden email]>
Cc: Mike Mintz <[hidden email]>; Shahid Chohan <[hidden email]>; Aaron Levin <[hidden email]>
Subject: RocksDB savepoint recovery performance improvements
 
Hey,

While running a Flink application with a large-state, savepoint recovery has been a painful part of operating the application because recovery time can be several hours. During some profiling that chohan (cc'd) had done, a red flag stood out — savepoint recovery consisted mostly of RocksDB Get and Put operations.

When Flink is bootstrapping state for RocksDB instances this is not what I would have expected, as RocksDB supports direct ingestion of the on-disk format (SSTables): https://github.com/facebook/rocksdb/wiki/Creating-and-Ingesting-SST-files. This was also recently reported on Jira: https://issues.apache.org/jira/browse/FLINK-17288.

From what I understood of the current implementation:

* The snapshot restoration pathways, RocksDBFullRestoreOperation and RocksDBIncrementalRestoreOperation, use RocksDBWriteBatchWrapper.

RocksDBWriteBatchWrapper is using RocksDB’s WriteBatch operator. This will provide atomicity of batches as well as performance benefits for batching, compared to individual Puts, but it will still involve RocksDB’s insert paths which can involve expensive operations[0].

Instead, by creating SSTable files and instructing RocksDB to ingest the files, writes can be batched even further and avoid expensive operations in RocksDB. This is commonly utilized by other systems for restoration or import processes, such as in CockroachDB[1], TiDB[2], and Percona[3].  There are some restrictions on being able to generate SSTables, as well as limitations for ingestion to be performant. Unfortunately, it’s all not very well documented:

1. When generating an SSTable, keys need to be inserted in-order.

2. Ingested files should not have key-ranges that overlap with either existing or other ingested files[4]. It is possible to ingest overlapping SSTables, but this may incur significant overhead.

To generate SSTables with non-overlapping key-ranges and to create them with keys in-order, it would mean that the savepoints would need to be ordered while processing them. I'm unsure if this is the case for how Flink's savepoints are stored. 

I have not dug into RocksDBIncrementalRestoreOperation yet, or how it is used (eg: for incremental checkpoint or something else). I did notice it is iterating over a temporary RocksDB instance and inserting into a "final instance. These writes could be optimized in a similar manner. Alternatively, it could be possible to use the temporary instance's SSTables, ingest them, and prune data out with RocksDB's DeleteRange.

To get started with prototyping, I was thinking of taking a simple approach of making an interface for RocksDBWriteBatchWrapper and swapping the implementation for one that does SSTable generation and ingestion. I reckon that will be an easy way to sanity check whether it works at all.

I was also planning on writing some benchmarks in RocksDB to understand the difference for ingestion scenarios, as RocksDB itself is sparse on details about SSTable ingestion[4] and does not have benchmarking for ingestion.

Does all of that seem sound? I'll report back when I get time to work out that implementation and tests, likely during the coming weekend.


Joey

---

[0]: I don’t have any specific sources on this. At a high-level, some of the operations happening during writes include writing to the memtable before flushing to an SSTable and doing merging and/or compaction. In general, these will add write-amplification and overall overhead to bulk insertion. These can largely be avoided by giving RocksDB SSTables, especially if they have non-overlapping key-ranges.  "Characterizing, Modeling, and Benchmarking RocksDB Key-Value Workloads at Facebook" (https://www.usenix.org/system/files/fast20-cao_zhichao.pdf) is a helpful source that highlights what happens during various workloads.


[1]: CockroachDB is a database that uses RocksDB as the on-disk storage. Their implementation consolidates bulk ingestion to an AddSSTable command. Primarily, they have some choice of options for SSTable generation and ingestion that are of interest:




[2]: TiDB, and TiKV which is the KV layer of the database, uses RocksDB as the on-disk storage. Their implementation of bulk ingestion is contained within:  https://github.com/tikv/tikv/blob/master/components/sst_importer/src/sst_importer.rs

Other useful references:
- https://github.com/tikv/tikv/issues/2404, discussing performance for copy vs. move options.


[3]: Percona is a SQL database which supports a RocksDB backend. Their implementation of ingestion can be found here: https://github.com/percona/percona-server/blob/a259dc92e76e1569bc5ed34993cc7fc6af43832a/storage/rocksdb/ha_rocksdb.cc#L2815


[4]: Again, there is not a lot of official resources on this. Notable references I found on this include:

https://github.com/facebook/rocksdb/issues/2473, which describes at a high-level how re-insertions work. 

https://github.com/facebook/rocksdb/issues/3540, which describes the performance costs for ingesting overlapping SSTables, and specific benchmarks (post-fix) here: https://github.com/facebook/rocksdb/pull/3564

https://github.com/facebook/rocksdb/pull/3179, which describes the mechanism for ingesting SSTable files: there need to be point-key overlap checks for the LSM.

https://github.com/facebook/rocksdb/issues/5133, indicates re-ingesting the same SSTable (due to restarts in import processes), can cause issues for a particular set of options.

https://github.com/facebook/rocksdb/issues/5770#issuecomment-528488245, indicates compaction occurs more (or, only) when overlapping SSTables are ingested. The thinking here is non-overlapping SSTable ingestion means very few operations (compaction, merging, etc) occur afterward, with the right tuning for generation and ingestion.

https://github.com/facebook/rocksdb/issues/5010, which discusses some unresolved issues for high CPU overhead on ingestion.
Reply | Threaded
Open this post in threaded view
|

Re: RocksDB savepoint recovery performance improvements

Yun Tang
[hidden email] I think you might need to create a new JIRA ticket and link your PR to the new issue as FLINK-17288 mainly focus on bulk load options while your solution focus on SST generator, if your solution could behave better, we could tag FLINK-17288 as "won't do".

[hidden email] sure, Flink community always suggest to use savepoint to restore but current checkpoint also support it. I mention that is for quick fix at his scenario.

Best
Yun Tang

From: Steven Wu <[hidden email]>
Sent: Wednesday, May 27, 2020 0:36
To: Joey Pereira <[hidden email]>
Cc: [hidden email] <[hidden email]>; Yun Tang <[hidden email]>; Mike Mintz <[hidden email]>; Shahid Chohan <[hidden email]>; Aaron Levin <[hidden email]>
Subject: Re: RocksDB savepoint recovery performance improvements
 
Yun, you mentioned that checkpoint also supports rescale. I thought the recommendation [1] is to use savepoint for rescale. 


On Tue, May 26, 2020 at 6:46 AM Joey Pereira <[hidden email]> wrote:
Following up: I've put together the implementation, https://github.com/apache/flink/pull/12345. It's passing tests but is only partially complete, as it still needs some clean-up and configuration. I still need to try running this against a production cluster to check the performance, as well as getting some RocksDB benchmarks.

On Mon, May 18, 2020 at 3:46 PM Joey Pereira <[hidden email]> wrote:
Thanks Yun for highlighting this, it's very helpful! I'll give it a go with that in mind.

We have already begun using checkpoints for recovery. Having these improvements would still be immensely helpful to reduce downtime for savepoint recovery.

On Mon, May 18, 2020 at 3:14 PM Yun Tang <[hidden email]> wrote:
Hi Joey

Previously, I also looked at the mechanism to create on-disk SSTables as I planed to use RocksDB's benchmark to mock scenario in Flink. However, I found the main challenge is how to ensure the keys are inserted in a strictly increasing order. The key order in java could differ from the bytes order in RocksDB. In your case, I think it could be much easier as RocksFullSnapshotStrategy write data per columnfamily per key group which should be in a strictly increasing order [1].

FLINK-17288 could mitigate the performance and your solution could help improve the performance much better (and could integrate with state-processor-api story).

On the other hand, for out-of-box to use in production for your scenario, how about using checkpoint to recover, as it also supports rescale and normal recover.



Best
Yun Tang

From: Joey Pereira <[hidden email]>
Sent: Tuesday, May 19, 2020 2:27
To: [hidden email] <[hidden email]>
Cc: Mike Mintz <[hidden email]>; Shahid Chohan <[hidden email]>; Aaron Levin <[hidden email]>
Subject: RocksDB savepoint recovery performance improvements
 
Hey,

While running a Flink application with a large-state, savepoint recovery has been a painful part of operating the application because recovery time can be several hours. During some profiling that chohan (cc'd) had done, a red flag stood out — savepoint recovery consisted mostly of RocksDB Get and Put operations.

When Flink is bootstrapping state for RocksDB instances this is not what I would have expected, as RocksDB supports direct ingestion of the on-disk format (SSTables): https://github.com/facebook/rocksdb/wiki/Creating-and-Ingesting-SST-files. This was also recently reported on Jira: https://issues.apache.org/jira/browse/FLINK-17288.

From what I understood of the current implementation:

* The snapshot restoration pathways, RocksDBFullRestoreOperation and RocksDBIncrementalRestoreOperation, use RocksDBWriteBatchWrapper.

RocksDBWriteBatchWrapper is using RocksDB’s WriteBatch operator. This will provide atomicity of batches as well as performance benefits for batching, compared to individual Puts, but it will still involve RocksDB’s insert paths which can involve expensive operations[0].

Instead, by creating SSTable files and instructing RocksDB to ingest the files, writes can be batched even further and avoid expensive operations in RocksDB. This is commonly utilized by other systems for restoration or import processes, such as in CockroachDB[1], TiDB[2], and Percona[3].  There are some restrictions on being able to generate SSTables, as well as limitations for ingestion to be performant. Unfortunately, it’s all not very well documented:

1. When generating an SSTable, keys need to be inserted in-order.

2. Ingested files should not have key-ranges that overlap with either existing or other ingested files[4]. It is possible to ingest overlapping SSTables, but this may incur significant overhead.

To generate SSTables with non-overlapping key-ranges and to create them with keys in-order, it would mean that the savepoints would need to be ordered while processing them. I'm unsure if this is the case for how Flink's savepoints are stored. 

I have not dug into RocksDBIncrementalRestoreOperation yet, or how it is used (eg: for incremental checkpoint or something else). I did notice it is iterating over a temporary RocksDB instance and inserting into a "final instance. These writes could be optimized in a similar manner. Alternatively, it could be possible to use the temporary instance's SSTables, ingest them, and prune data out with RocksDB's DeleteRange.

To get started with prototyping, I was thinking of taking a simple approach of making an interface for RocksDBWriteBatchWrapper and swapping the implementation for one that does SSTable generation and ingestion. I reckon that will be an easy way to sanity check whether it works at all.

I was also planning on writing some benchmarks in RocksDB to understand the difference for ingestion scenarios, as RocksDB itself is sparse on details about SSTable ingestion[4] and does not have benchmarking for ingestion.

Does all of that seem sound? I'll report back when I get time to work out that implementation and tests, likely during the coming weekend.


Joey

---

[0]: I don’t have any specific sources on this. At a high-level, some of the operations happening during writes include writing to the memtable before flushing to an SSTable and doing merging and/or compaction. In general, these will add write-amplification and overall overhead to bulk insertion. These can largely be avoided by giving RocksDB SSTables, especially if they have non-overlapping key-ranges.  "Characterizing, Modeling, and Benchmarking RocksDB Key-Value Workloads at Facebook" (https://www.usenix.org/system/files/fast20-cao_zhichao.pdf) is a helpful source that highlights what happens during various workloads.


[1]: CockroachDB is a database that uses RocksDB as the on-disk storage. Their implementation consolidates bulk ingestion to an AddSSTable command. Primarily, they have some choice of options for SSTable generation and ingestion that are of interest:




[2]: TiDB, and TiKV which is the KV layer of the database, uses RocksDB as the on-disk storage. Their implementation of bulk ingestion is contained within:  https://github.com/tikv/tikv/blob/master/components/sst_importer/src/sst_importer.rs

Other useful references:
- https://github.com/tikv/tikv/issues/2404, discussing performance for copy vs. move options.


[3]: Percona is a SQL database which supports a RocksDB backend. Their implementation of ingestion can be found here: https://github.com/percona/percona-server/blob/a259dc92e76e1569bc5ed34993cc7fc6af43832a/storage/rocksdb/ha_rocksdb.cc#L2815


[4]: Again, there is not a lot of official resources on this. Notable references I found on this include:

https://github.com/facebook/rocksdb/issues/2473, which describes at a high-level how re-insertions work. 

https://github.com/facebook/rocksdb/issues/3540, which describes the performance costs for ingesting overlapping SSTables, and specific benchmarks (post-fix) here: https://github.com/facebook/rocksdb/pull/3564

https://github.com/facebook/rocksdb/pull/3179, which describes the mechanism for ingesting SSTable files: there need to be point-key overlap checks for the LSM.

https://github.com/facebook/rocksdb/issues/5133, indicates re-ingesting the same SSTable (due to restarts in import processes), can cause issues for a particular set of options.

https://github.com/facebook/rocksdb/issues/5770#issuecomment-528488245, indicates compaction occurs more (or, only) when overlapping SSTables are ingested. The thinking here is non-overlapping SSTable ingestion means very few operations (compaction, merging, etc) occur afterward, with the right tuning for generation and ingestion.

https://github.com/facebook/rocksdb/issues/5010, which discusses some unresolved issues for high CPU overhead on ingestion.