I have a stateful operator in a task which processes thousands of elements per second (per task) when using the Filesystem backend. As documented and reported by other users, when I switch to the RocksDB backend, throughput is considerably lower. I need something that combines the high performance of in-memory with the large state and incremental checkpointing of RocksDB. For my application, I can *almost* accomplish this by including a caching layer which maintains a map of pending (key, value) writes. These are periodically flushed to the RocksDB (and always prior to a checkpoint). This greatly reduces the number of writes to RocksDB, and means that I can get a "best of both worlds" in terms of throughput and reliability/incremental checkpointing. These optimizations make sense for my workload, since events which operate on the same key tend to be close together in the stream. (Over the long haul, there are many millions of keys, and occasionally, an event references some key from the distant past, hence the need for RocksDB.) Unfortunately, this solution does not work perfectly because when I do eventually flush writes to the underlying RocksDB backend, the stateful processor may be operating on an element which belongs to a different key group. Hence, the elements that I flush are associated with the wrong key group, and things don't work quite right. Is there any way to wrap the RocksDB backend with caching and deferred writes (in a way which is "key-group aware")? Thanks! David |
Hi,David RocksdbKeyedBackend is used under keyContext, every operation with state should setCurrentKey to let the rocksdb aware of the current key and complute the currrent keyGroup. Use these two parts to interactive with the underyling rocksdb. I think you can achieve this goal by setCurrentKey before flush to rocksdb or make the prefix key (keygroup + key) yourself put/get value to/from rocksdb. Thanks, Aitozi 发件人: "David J. C. Beach" <[hidden email]> I have a stateful operator in a task which processes thousands of elements per second (per task) when using the Filesystem backend. As documented and reported by other users, when I switch to the RocksDB backend, throughput is considerably lower. I need something that combines the high performance of in-memory with the large state and incremental checkpointing of RocksDB. For my application, I can *almost* accomplish this by including a caching layer which maintains a map of pending (key, value) writes. These are periodically flushed to the RocksDB (and always prior to a checkpoint). This greatly reduces the number of writes to RocksDB, and means that I can get a "best of both worlds" in terms of throughput and reliability/incremental checkpointing. These optimizations make sense for my workload, since events which operate on the same key tend to be close together in the stream. (Over the long haul, there are many millions of keys, and occasionally, an event references some key from the distant past, hence the need for RocksDB.) Unfortunately, this solution does not work perfectly because when I do eventually flush writes to the underlying RocksDB backend, the stateful processor may be operating on an element which belongs to a different key group. Hence, the elements that I flush are associated with the wrong key group, and things don't work quite right. Is there any way to wrap the RocksDB backend with caching and deferred writes (in a way which is "key-group aware")? Thanks! David |
Thanks Aitozi. Your answer makes good sense and I'm trying to implement this now. My code is written as a KeyedProcessFunction, but I can't see where this exposes the KeyContext interface. Is there anything you can point me to in the docs? Best, David On Sun, Apr 28, 2019 at 8:09 PM aitozi <[hidden email]> wrote:
|
Hi,David Before I open an issue about this and [hidden email] [hidden email] suggested me to extends the AbstractStreamOperator to custom the operator operation on state or extends the statebackend to add a cache layer on it. And a new stateBackend was introduced in Flink Forward China 2018 by alibaba , but it has not been open source , you can take a look on the slides. https://files.alicdn.com/tpsservice/1df9ccb8a7b6b2782a558d3c32d40c19.pdf Thanks, Aitozi 发件人: "David J. C. Beach" <[hidden email]> Thanks Aitozi. Your answer makes good sense and I'm trying to implement this now. My code is written as a KeyedProcessFunction, but I can't see where this exposes the KeyContext interface. Is there anything you can point me to in the docs? Best, David On Sun, Apr 28, 2019 at 8:09 PM aitozi <[hidden email]> wrote:
|
Hi, David
When you flush data to db, you can reference the serialize logic[1], and store the serialized bytes to RocksDB.
Best, Congxian
On Apr 29, 2019, 12:57 +0800, aitozi <[hidden email]>, wrote:
|
Free forum by Nabble | Edit this page |