Question about Checkpoint Storage (RocksDB)

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Question about Checkpoint Storage (RocksDB)

Sameer Wadkar
Hi,

My understanding about the RocksDB state backend is as follows:

When using a RocksDB state backend, it the checkpoints are backed up locally (to the TaskManager) using the backup feature of RocksDB by taking snapshots from RocksDB which are consistent read-only views on the RockDB database. Each checkpoint is backed up on the task manager node and this checkpoint is asynchronously backed up to the remote HDFS location.  When each checkpoint is committed, the records are deleted from RocksDB, allowing RocksDb data folders to remain small. This in turn allows each snapshot to be relatively small. If the Task node goes away due to failure, I assume the RocksDB database is restored from the checkpoints from the remote HDFS. Since each checkpoint state is relatively small, the restoration time from HDFS for the RocksDB database on the new task node is relatively small.

The question is, if using really long windows (in hours) if the state of the window gets very large over time, would size of the RocksDB get larger? Would replication to HDFS start causing performance bottlenecks? Also would this need a constant (at checkpoint interval?), read from RocksDB, add more window elements and write to RocksDB. 

Outside of the read costs, is there a risk to having very long windows when you know you could collect a lot of elements in them. Instead is it safer to perform aggregations on top of aggregations or use your own custom remote store like HBase to persist larger state per record and use windows only to store the keys in HBase. I mention HBase because of its support for column qualifiers allow elements to be added to the same key in multiple ordered column qualifiers. Reading can also be throttled in batches of column qualifiers allowing for the better memory consumption. Is this approach used in practice?

Thanks,
Sameer
Reply | Threaded
Open this post in threaded view
|

Re: Question about Checkpoint Storage (RocksDB)

Ufuk Celebi
On Mon, Jul 25, 2016 at 8:50 PM, Sameer W <[hidden email]> wrote:
> The question is, if using really long windows (in hours) if the state of the
> window gets very large over time, would size of the RocksDB get larger?
> Would replication to HDFS start causing performance bottlenecks? Also would
> this need a constant (at checkpoint interval?), read from RocksDB, add more
> window elements and write to RocksDB.

Yes. The size of the RocksDB instance is directly correlated with the
number of K/V state pairs you store. You can remove state by calling
`clear()` on the KvState instance.

All state updates go directly to RocksDB and snapshots copy the DB
files (semi-async mode, current default) or iterate-and-copy all K/V
pairs (fully-async mode). No records are deleted automatically after
snapshots.

Snapshotting large RocksDB instances will cause some slow down, but
you can trade this cost off by adjusting the checkpointing interval.
There are plans to do the snapshots in an incremental fashion in order
to lower the costs for this, but there is no design doc available for
it at this point.

> Outside of the read costs, is there a risk to having very long windows when
> you know you could collect a lot of elements in them. Instead is it safer to
> perform aggregations on top of aggregations or use your own custom remote
> store like HBase to persist larger state per record and use windows only to
> store the keys in HBase. I mention HBase because of its support for column
> qualifiers allow elements to be added to the same key in multiple ordered
> column qualifiers. Reading can also be throttled in batches of column
> qualifiers allowing for the better memory consumption. Is this approach used
> in practice?

RocksDB works quite well for large stateful jobs. If possible for your
use case, I would still recommend work with pre-aggregating window
functions (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html#window-functions)
or pre-aggregating the data. The I/O costs will correlate with the
state size, but there is "no risk" in the sense of that it will still
work as expected.

What you describe with HBase could work, but I'm not aware of someone
doing this. Furhtermore, depending on your use case, it can cause
problems in failure scenarios, because you might need to keep HBase
and Flink state in sync.
Reply | Threaded
Open this post in threaded view
|

Re: Question about Checkpoint Storage (RocksDB)

Sameer Wadkar
Thanks Ufuk,

That was very helpful. But that raised a few more questions :-):

1. Calling clear() on the KV state is only possible for snapshots right? Do you control that for checkpoints too.

2. Assuming that the user has no control over the checkpoint process outside of controlling the checkpoint interval , when is the RocksDB cleared of the operator state for checkpoints after they are long past. It seems like there are only two checkpoints that are really necessary to maintain, the current one and the previous one for restore. Does Flink clean up checkpoints on a timer? When it does clean up checkpoints does it also clean up the state backend (I am assuming they are different).

3. The pre-aggregating windows was very helpful as the WindowFunction is now passed the pre-aggregated state. For windows, are the Reduce and Fold functions called on each element event before the window is triggered. I can see how that would work where the pre-compute is done per element but the actual output is emitted only when the window is fired. But that is only possible if there are no Evictors defined on the window? Also how are the elements fed to the Reduce/Fold function. Is it like MapReduce where even if you are using a Iterator, in reality all the values for a key are not buffered into memory? Which ties back to how is RocksDB is used to store a large window state before it is triggered. If my elements are accumulating in a window (serving a ReduceFunction) does it spill to disk (RocksDB?) when a threshold size is reached?

Thanks,
Sameer



On Tue, Jul 26, 2016 at 7:29 AM, Ufuk Celebi <[hidden email]> wrote:
On Mon, Jul 25, 2016 at 8:50 PM, Sameer W <[hidden email]> wrote:
> The question is, if using really long windows (in hours) if the state of the
> window gets very large over time, would size of the RocksDB get larger?
> Would replication to HDFS start causing performance bottlenecks? Also would
> this need a constant (at checkpoint interval?), read from RocksDB, add more
> window elements and write to RocksDB.

Yes. The size of the RocksDB instance is directly correlated with the
number of K/V state pairs you store. You can remove state by calling
`clear()` on the KvState instance.

All state updates go directly to RocksDB and snapshots copy the DB
files (semi-async mode, current default) or iterate-and-copy all K/V
pairs (fully-async mode). No records are deleted automatically after
snapshots.

Snapshotting large RocksDB instances will cause some slow down, but
you can trade this cost off by adjusting the checkpointing interval.
There are plans to do the snapshots in an incremental fashion in order
to lower the costs for this, but there is no design doc available for
it at this point.

> Outside of the read costs, is there a risk to having very long windows when
> you know you could collect a lot of elements in them. Instead is it safer to
> perform aggregations on top of aggregations or use your own custom remote
> store like HBase to persist larger state per record and use windows only to
> store the keys in HBase. I mention HBase because of its support for column
> qualifiers allow elements to be added to the same key in multiple ordered
> column qualifiers. Reading can also be throttled in batches of column
> qualifiers allowing for the better memory consumption. Is this approach used
> in practice?

RocksDB works quite well for large stateful jobs. If possible for your
use case, I would still recommend work with pre-aggregating window
functions (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html#window-functions)
or pre-aggregating the data. The I/O costs will correlate with the
state size, but there is "no risk" in the sense of that it will still
work as expected.

What you describe with HBase could work, but I'm not aware of someone
doing this. Furhtermore, depending on your use case, it can cause
problems in failure scenarios, because you might need to keep HBase
and Flink state in sync.

Reply | Threaded
Open this post in threaded view
|

Re: Question about Checkpoint Storage (RocksDB)

Ufuk Celebi
On Tue, Jul 26, 2016 at 2:15 PM, Sameer W <[hidden email]> wrote:
> 1. Calling clear() on the KV state is only possible for snapshots right? Do
> you control that for checkpoints too.

What do you mean with snapshots vs. checkpoints exactly?

> 2. Assuming that the user has no control over the checkpoint process outside
> of controlling the checkpoint interval , when is the RocksDB cleared of the
> operator state for checkpoints after they are long past. It seems like there
> are only two checkpoints that are really necessary to maintain, the current
> one and the previous one for restore. Does Flink clean up checkpoints on a
> timer? When it does clean up checkpoints does it also clean up the state
> backend (I am assuming they are different).

Yes, here: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/fault_tolerance.html

By default, only one completed checkpoint is kept.

> 3. The pre-aggregating windows was very helpful as the WindowFunction is now
> passed the pre-aggregated state. For windows, are the Reduce and Fold
> functions called on each element event before the window is triggered. I can
> see how that would work where the pre-compute is done per element but the
> actual output is emitted only when the window is fired. But that is only
> possible if there are no Evictors defined on the window? Also how are the
> elements fed to the Reduce/Fold function. Is it like MapReduce where even if
> you are using a Iterator, in reality all the values for a key are not
> buffered into memory? Which ties back to how is RocksDB is used to store a
> large window state before it is triggered. If my elements are accumulating
> in a window (serving a ReduceFunction) does it spill to disk (RocksDB?) when
> a threshold size is reached?

- The function is called before adding the element to the window KV state
- Yes, only possible if no evictors are defined
- The window reduce function is applied directly on the elements of
the stream and then update the KvState instance (e.g. update RocksDB)
- Operations with RocksDB always touch RocksDB, which takes care of
spilling etc.
Reply | Threaded
Open this post in threaded view
|

Re: Question about Checkpoint Storage (RocksDB)

Sameer Wadkar
Thank you. That clears it up.

I meant SavePoints. Sorry I used the term Snapshots in its place :-). 

Thanks,
Sameer

On Tue, Jul 26, 2016 at 8:33 AM, Ufuk Celebi <[hidden email]> wrote:
On Tue, Jul 26, 2016 at 2:15 PM, Sameer W <[hidden email]> wrote:
> 1. Calling clear() on the KV state is only possible for snapshots right? Do
> you control that for checkpoints too.

What do you mean with snapshots vs. checkpoints exactly?

> 2. Assuming that the user has no control over the checkpoint process outside
> of controlling the checkpoint interval , when is the RocksDB cleared of the
> operator state for checkpoints after they are long past. It seems like there
> are only two checkpoints that are really necessary to maintain, the current
> one and the previous one for restore. Does Flink clean up checkpoints on a
> timer? When it does clean up checkpoints does it also clean up the state
> backend (I am assuming they are different).

Yes, here: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/fault_tolerance.html

By default, only one completed checkpoint is kept.

> 3. The pre-aggregating windows was very helpful as the WindowFunction is now
> passed the pre-aggregated state. For windows, are the Reduce and Fold
> functions called on each element event before the window is triggered. I can
> see how that would work where the pre-compute is done per element but the
> actual output is emitted only when the window is fired. But that is only
> possible if there are no Evictors defined on the window? Also how are the
> elements fed to the Reduce/Fold function. Is it like MapReduce where even if
> you are using a Iterator, in reality all the values for a key are not
> buffered into memory? Which ties back to how is RocksDB is used to store a
> large window state before it is triggered. If my elements are accumulating
> in a window (serving a ReduceFunction) does it spill to disk (RocksDB?) when
> a threshold size is reached?

- The function is called before adding the element to the window KV state
- Yes, only possible if no evictors are defined
- The window reduce function is applied directly on the elements of
the stream and then update the KvState instance (e.g. update RocksDB)
- Operations with RocksDB always touch RocksDB, which takes care of
spilling etc.