RocksDB backend with deferred writes?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

RocksDB backend with deferred writes?

David J. C. Beach
I have a stateful operator in a task which processes thousands of elements per second (per task) when using the Filesystem backend.  As documented and reported by other users, when I switch to the RocksDB backend, throughput is considerably lower.  I need something that combines the high performance of in-memory with the large state and incremental checkpointing of RocksDB.

For my application, I can *almost* accomplish this by including a caching layer which maintains a map of pending (key, value) writes.  These are periodically flushed to the RocksDB (and always prior to a checkpoint).  This greatly reduces the number of writes to RocksDB, and means that I can get a "best of both worlds" in terms of throughput and reliability/incremental checkpointing.  These optimizations make sense for my workload, since events which operate on the same key tend to be close together in the stream.  (Over the long haul, there are many millions of keys, and occasionally, an event references some key from the distant past, hence the need for RocksDB.)

Unfortunately, this solution does not work perfectly because when I do eventually flush writes to the underlying RocksDB backend, the stateful processor may be operating on an element which belongs to a different key group.  Hence, the elements that I flush are associated with the wrong key group, and things don't work quite right.

Is there any way to wrap the RocksDB backend with caching and deferred writes (in a way which is "key-group aware")?

Thanks!

David

Reply | Threaded
Open this post in threaded view
|

Re: RocksDB backend with deferred writes?

aitozi

HiDavid

 

RocksdbKeyedBackend is used under keyContext, every operation with state should setCurrentKey to let the rocksdb aware of the current key and complute the currrent keyGroup. Use these two parts to interactive with the underyling rocksdb.

 

I think you can achieve this goal by setCurrentKey before flush to rocksdb or make the prefix key (keygroup + key) yourself put/get value to/from rocksdb.

 

Thanks,

Aitozi

 

 

发件人: "David J. C. Beach" <[hidden email]>
日期: 2019429 星期一 上午6:43
收件人: <[hidden email]>
主题: RocksDB backend with deferred writes?

 

I have a stateful operator in a task which processes thousands of elements per second (per task) when using the Filesystem backend.  As documented and reported by other users, when I switch to the RocksDB backend, throughput is considerably lower.  I need something that combines the high performance of in-memory with the large state and incremental checkpointing of RocksDB.

 

For my application, I can *almost* accomplish this by including a caching layer which maintains a map of pending (key, value) writes.  These are periodically flushed to the RocksDB (and always prior to a checkpoint).  This greatly reduces the number of writes to RocksDB, and means that I can get a "best of both worlds" in terms of throughput and reliability/incremental checkpointing.  These optimizations make sense for my workload, since events which operate on the same key tend to be close together in the stream.  (Over the long haul, there are many millions of keys, and occasionally, an event references some key from the distant past, hence the need for RocksDB.)

 

Unfortunately, this solution does not work perfectly because when I do eventually flush writes to the underlying RocksDB backend, the stateful processor may be operating on an element which belongs to a different key group.  Hence, the elements that I flush are associated with the wrong key group, and things don't work quite right.

 

Is there any way to wrap the RocksDB backend with caching and deferred writes (in a way which is "key-group aware")?

 

Thanks!

 

David

 

Reply | Threaded
Open this post in threaded view
|

Re: RocksDB backend with deferred writes?

David J. C. Beach
Thanks Aitozi.

Your answer makes good sense and I'm trying to implement this now.  My code is written as a KeyedProcessFunction, but I can't see where this exposes the KeyContext interface.  Is there anything you can point me to in the docs?

Best,
David



On Sun, Apr 28, 2019 at 8:09 PM aitozi <[hidden email]> wrote:

HiDavid

 

RocksdbKeyedBackend is used under keyContext, every operation with state should setCurrentKey to let the rocksdb aware of the current key and complute the currrent keyGroup. Use these two parts to interactive with the underyling rocksdb.

 

I think you can achieve this goal by setCurrentKey before flush to rocksdb or make the prefix key (keygroup + key) yourself put/get value to/from rocksdb.

 

Thanks,

Aitozi

 

 

发件人: "David J. C. Beach" <[hidden email]>
日期: 2019429 星期一 上午6:43
收件人: <[hidden email]>
主题: RocksDB backend with deferred writes?

 

I have a stateful operator in a task which processes thousands of elements per second (per task) when using the Filesystem backend.  As documented and reported by other users, when I switch to the RocksDB backend, throughput is considerably lower.  I need something that combines the high performance of in-memory with the large state and incremental checkpointing of RocksDB.

 

For my application, I can *almost* accomplish this by including a caching layer which maintains a map of pending (key, value) writes.  These are periodically flushed to the RocksDB (and always prior to a checkpoint).  This greatly reduces the number of writes to RocksDB, and means that I can get a "best of both worlds" in terms of throughput and reliability/incremental checkpointing.  These optimizations make sense for my workload, since events which operate on the same key tend to be close together in the stream.  (Over the long haul, there are many millions of keys, and occasionally, an event references some key from the distant past, hence the need for RocksDB.)

 

Unfortunately, this solution does not work perfectly because when I do eventually flush writes to the underlying RocksDB backend, the stateful processor may be operating on an element which belongs to a different key group.  Hence, the elements that I flush are associated with the wrong key group, and things don't work quite right.

 

Is there any way to wrap the RocksDB backend with caching and deferred writes (in a way which is "key-group aware")?

 

Thanks!

 

David

 

Reply | Threaded
Open this post in threaded view
|

Re: RocksDB backend with deferred writes?

aitozi

Hi,David

 

Before I open an issue about this and [hidden email] [hidden email] suggested me to extends the AbstractStreamOperator to custom the operator operation on state or extends the statebackend to add a cache layer on it.

 

Fyi: https://issues.apache.org/jira/browse/FLINK-10343?focusedCommentId=16614992&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16614992

 

And a new stateBackend was introduced in Flink Forward China 2018 by alibaba , but it has not been open source , you can take a look on the slides.

 

https://files.alicdn.com/tpsservice/1df9ccb8a7b6b2782a558d3c32d40c19.pdf

 

Thanks,

Aitozi

 

发件人: "David J. C. Beach" <[hidden email]>
日期: 2019429 星期一 上午11:52
收件人: aitozi <[hidden email]>
抄送: <[hidden email]>
主题: Re: RocksDB backend with deferred writes?

 

Thanks Aitozi.

 

Your answer makes good sense and I'm trying to implement this now.  My code is written as a KeyedProcessFunction, but I can't see where this exposes the KeyContext interface.  Is there anything you can point me to in the docs?

 

Best,

David

 

 

 

On Sun, Apr 28, 2019 at 8:09 PM aitozi <[hidden email]> wrote:

HiDavid

 

RocksdbKeyedBackend is used under keyContext, every operation with state should setCurrentKey to let the rocksdb aware of the current key and complute the currrent keyGroup. Use these two parts to interactive with the underyling rocksdb.

 

I think you can achieve this goal by setCurrentKey before flush to rocksdb or make the prefix key (keygroup + key) yourself put/get value to/from rocksdb.

 

Thanks,

Aitozi

 

 

发件人: "David J. C. Beach" <[hidden email]>
日期: 2019429 星期一 上午6:43
收件人: <[hidden email]>
主题: RocksDB backend with deferred writes?

 

I have a stateful operator in a task which processes thousands of elements per second (per task) when using the Filesystem backend.  As documented and reported by other users, when I switch to the RocksDB backend, throughput is considerably lower.  I need something that combines the high performance of in-memory with the large state and incremental checkpointing of RocksDB.

 

For my application, I can *almost* accomplish this by including a caching layer which maintains a map of pending (key, value) writes.  These are periodically flushed to the RocksDB (and always prior to a checkpoint).  This greatly reduces the number of writes to RocksDB, and means that I can get a "best of both worlds" in terms of throughput and reliability/incremental checkpointing.  These optimizations make sense for my workload, since events which operate on the same key tend to be close together in the stream.  (Over the long haul, there are many millions of keys, and occasionally, an event references some key from the distant past, hence the need for RocksDB.)

 

Unfortunately, this solution does not work perfectly because when I do eventually flush writes to the underlying RocksDB backend, the stateful processor may be operating on an element which belongs to a different key group.  Hence, the elements that I flush are associated with the wrong key group, and things don't work quite right.

 

Is there any way to wrap the RocksDB backend with caching and deferred writes (in a way which is "key-group aware")?

 

Thanks!

 

David

 

Reply | Threaded
Open this post in threaded view
|

Re: RocksDB backend with deferred writes?

Congxian Qiu

Best, Congxian
On Apr 29, 2019, 12:57 +0800, aitozi <[hidden email]>, wrote:

Hi,David

 

Before I open an issue about this and [hidden email] [hidden email] suggested me to extends the AbstractStreamOperator to custom the operator operation on state or extends the statebackend to add a cache layer on it.

 

Fyi: https://issues.apache.org/jira/browse/FLINK-10343?focusedCommentId=16614992&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16614992

 

And a new stateBackend was introduced in Flink Forward China 2018 by alibaba , but it has not been open source , you can take a look on the slides.

 

https://files.alicdn.com/tpsservice/1df9ccb8a7b6b2782a558d3c32d40c19.pdf

 

Thanks,

Aitozi

 

发件人: "David J. C. Beach" <[hidden email]>
日期: 2019429 星期一 上午11:52
收件人: aitozi <[hidden email]>
抄送: <[hidden email]>
主题: Re: RocksDB backend with deferred writes?

 

Thanks Aitozi.

 

Your answer makes good sense and I'm trying to implement this now.  My code is written as a KeyedProcessFunction, but I can't see where this exposes the KeyContext interface.  Is there anything you can point me to in the docs?

 

Best,

David

 

 

 

On Sun, Apr 28, 2019 at 8:09 PM aitozi <[hidden email]> wrote:

HiDavid

 

RocksdbKeyedBackend is used under keyContext, every operation with state should setCurrentKey to let the rocksdb aware of the current key and complute the currrent keyGroup. Use these two parts to interactive with the underyling rocksdb.

 

I think you can achieve this goal by setCurrentKey before flush to rocksdb or make the prefix key (keygroup + key) yourself put/get value to/from rocksdb.

 

Thanks,

Aitozi

 

 

发件人: "David J. C. Beach" <[hidden email]>
日期: 2019429 星期一 上午6:43
收件人: <[hidden email]>
主题: RocksDB backend with deferred writes?

 

I have a stateful operator in a task which processes thousands of elements per second (per task) when using the Filesystem backend.  As documented and reported by other users, when I switch to the RocksDB backend, throughput is considerably lower.  I need something that combines the high performance of in-memory with the large state and incremental checkpointing of RocksDB.

 

For my application, I can *almost* accomplish this by including a caching layer which maintains a map of pending (key, value) writes.  These are periodically flushed to the RocksDB (and always prior to a checkpoint).  This greatly reduces the number of writes to RocksDB, and means that I can get a "best of both worlds" in terms of throughput and reliability/incremental checkpointing.  These optimizations make sense for my workload, since events which operate on the same key tend to be close together in the stream.  (Over the long haul, there are many millions of keys, and occasionally, an event references some key from the distant past, hence the need for RocksDB.)

 

Unfortunately, this solution does not work perfectly because when I do eventually flush writes to the underlying RocksDB backend, the stateful processor may be operating on an element which belongs to a different key group.  Hence, the elements that I flush are associated with the wrong key group, and things don't work quite right.

 

Is there any way to wrap the RocksDB backend with caching and deferred writes (in a way which is "key-group aware")?

 

Thanks!

 

David