FsStateBackend vs RocksDBStateBackend

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

FsStateBackend vs RocksDBStateBackend

Ran Zhang
Hi all,

We have a Flink app that uses a KeyedProcessFunction, and in the function it requires a ValueState(of TreeSet) and the processElement method needs to access and update it. We tried to use RocksDB as our stateBackend but the performance is not good, and intuitively we think it was because of the serialization / deserialization on each processElement call. Then we tried to switch to use FsStateBackend (which keeps the in-flight data in the TaskManager’s memory according to doc), and it could resolve the performance issue. So we want to understand better what are the tradeoffs in choosing between these 2 stateBackend. Our checkpoint size is 200 - 300 GB in stable state. For now we know one benefits of RocksDB is it supports incremental checkpoint, but would love to know what else we are losing in choosing FsStateBackend.

Thanks a lot!
Ran Zhang

Reply | Threaded
Open this post in threaded view
|

Re: FsStateBackend vs RocksDBStateBackend

Tzu-Li (Gordon) Tai
Hi Ran,

On Thu, Jan 30, 2020 at 9:39 AM Ran Zhang <[hidden email]> wrote:
Hi all,

We have a Flink app that uses a KeyedProcessFunction, and in the function it requires a ValueState(of TreeSet) and the processElement method needs to access and update it. We tried to use RocksDB as our stateBackend but the performance is not good, and intuitively we think it was because of the serialization / deserialization on each processElement call.

As you have already pointed out, serialization behaviour is a major difference between the 2 state backends, and will directly impact performance due to the extra runtime overhead in RocksDB.
If you plan to continue using the RocksDB state backend, make sure to use MapState instead of ValueState where possible, since every access to the ValueState in the RocksDB backend requires serializing / deserializing the whole value.
For MapState, de-/serialization happens per K-V access. Whether or not this makes sense would of course depend on your state access pattern.
 
Then we tried to switch to use FsStateBackend (which keeps the in-flight data in the TaskManager’s memory according to doc), and it could resolve the performance issue. So we want to understand better what are the tradeoffs in choosing between these 2 stateBackend. Our checkpoint size is 200 - 300 GB in stable state. For now we know one benefits of RocksDB is it supports incremental checkpoint, but would love to know what else we are losing in choosing FsStateBackend.

As of now, feature-wise both backends support asynchronous snapshotting, state schema evolution, and access via the State Processor API.
In the end, the major factor for deciding between the two state backends would be your expected state size.
That being said, it could be possible in the future that savepoint formats for the backends are changed to be compatible, meaning that you will be able to switch between different backends upon restore [1].
 

Thanks a lot!
Ran Zhang
Reply | Threaded
Open this post in threaded view
|

Re: FsStateBackend vs RocksDBStateBackend

Ran Zhang
Hi Gordon,

Thanks for your reply! Regarding state size - we are at 200-300gb but we have 120 parallelism which will make each task handle ~2 - 3 gb state. (when we submit the job we are setting tm memory to 15g.) In this scenario what will be the best fit for statebackend? 

Thanks,
Ran

On Wed, Jan 29, 2020 at 6:37 PM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Ran,

On Thu, Jan 30, 2020 at 9:39 AM Ran Zhang <[hidden email]> wrote:
Hi all,

We have a Flink app that uses a KeyedProcessFunction, and in the function it requires a ValueState(of TreeSet) and the processElement method needs to access and update it. We tried to use RocksDB as our stateBackend but the performance is not good, and intuitively we think it was because of the serialization / deserialization on each processElement call.

As you have already pointed out, serialization behaviour is a major difference between the 2 state backends, and will directly impact performance due to the extra runtime overhead in RocksDB.
If you plan to continue using the RocksDB state backend, make sure to use MapState instead of ValueState where possible, since every access to the ValueState in the RocksDB backend requires serializing / deserializing the whole value.
For MapState, de-/serialization happens per K-V access. Whether or not this makes sense would of course depend on your state access pattern.
 
Then we tried to switch to use FsStateBackend (which keeps the in-flight data in the TaskManager’s memory according to doc), and it could resolve the performance issue. So we want to understand better what are the tradeoffs in choosing between these 2 stateBackend. Our checkpoint size is 200 - 300 GB in stable state. For now we know one benefits of RocksDB is it supports incremental checkpoint, but would love to know what else we are losing in choosing FsStateBackend.

As of now, feature-wise both backends support asynchronous snapshotting, state schema evolution, and access via the State Processor API.
In the end, the major factor for deciding between the two state backends would be your expected state size.
That being said, it could be possible in the future that savepoint formats for the backends are changed to be compatible, meaning that you will be able to switch between different backends upon restore [1].
 

Thanks a lot!
Ran Zhang
Reply | Threaded
Open this post in threaded view
|

Re: FsStateBackend vs RocksDBStateBackend

rmetzger0
I would try the FsStateBackend in this scenario, as you have enough memory available.

On Thu, Jan 30, 2020 at 5:26 PM Ran Zhang <[hidden email]> wrote:
Hi Gordon,

Thanks for your reply! Regarding state size - we are at 200-300gb but we have 120 parallelism which will make each task handle ~2 - 3 gb state. (when we submit the job we are setting tm memory to 15g.) In this scenario what will be the best fit for statebackend? 

Thanks,
Ran

On Wed, Jan 29, 2020 at 6:37 PM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Ran,

On Thu, Jan 30, 2020 at 9:39 AM Ran Zhang <[hidden email]> wrote:
Hi all,

We have a Flink app that uses a KeyedProcessFunction, and in the function it requires a ValueState(of TreeSet) and the processElement method needs to access and update it. We tried to use RocksDB as our stateBackend but the performance is not good, and intuitively we think it was because of the serialization / deserialization on each processElement call.

As you have already pointed out, serialization behaviour is a major difference between the 2 state backends, and will directly impact performance due to the extra runtime overhead in RocksDB.
If you plan to continue using the RocksDB state backend, make sure to use MapState instead of ValueState where possible, since every access to the ValueState in the RocksDB backend requires serializing / deserializing the whole value.
For MapState, de-/serialization happens per K-V access. Whether or not this makes sense would of course depend on your state access pattern.
 
Then we tried to switch to use FsStateBackend (which keeps the in-flight data in the TaskManager’s memory according to doc), and it could resolve the performance issue. So we want to understand better what are the tradeoffs in choosing between these 2 stateBackend. Our checkpoint size is 200 - 300 GB in stable state. For now we know one benefits of RocksDB is it supports incremental checkpoint, but would love to know what else we are losing in choosing FsStateBackend.

As of now, feature-wise both backends support asynchronous snapshotting, state schema evolution, and access via the State Processor API.
In the end, the major factor for deciding between the two state backends would be your expected state size.
That being said, it could be possible in the future that savepoint formats for the backends are changed to be compatible, meaning that you will be able to switch between different backends upon restore [1].
 

Thanks a lot!
Ran Zhang
Reply | Threaded
Open this post in threaded view
|

Re: FsStateBackend vs RocksDBStateBackend

Yu Li
Yes FsStateBackend would be the best fit for state access performance in this case. Just a reminder that FsStateBackend will upload the full dataset to DFS during checkpointing, so please watch the network bandwidth usage and make sure it won't become a new bottleneck.

Best Regards,
Yu


On Fri, 21 Feb 2020 at 20:56, Robert Metzger <[hidden email]> wrote:
I would try the FsStateBackend in this scenario, as you have enough memory available.

On Thu, Jan 30, 2020 at 5:26 PM Ran Zhang <[hidden email]> wrote:
Hi Gordon,

Thanks for your reply! Regarding state size - we are at 200-300gb but we have 120 parallelism which will make each task handle ~2 - 3 gb state. (when we submit the job we are setting tm memory to 15g.) In this scenario what will be the best fit for statebackend? 

Thanks,
Ran

On Wed, Jan 29, 2020 at 6:37 PM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Ran,

On Thu, Jan 30, 2020 at 9:39 AM Ran Zhang <[hidden email]> wrote:
Hi all,

We have a Flink app that uses a KeyedProcessFunction, and in the function it requires a ValueState(of TreeSet) and the processElement method needs to access and update it. We tried to use RocksDB as our stateBackend but the performance is not good, and intuitively we think it was because of the serialization / deserialization on each processElement call.

As you have already pointed out, serialization behaviour is a major difference between the 2 state backends, and will directly impact performance due to the extra runtime overhead in RocksDB.
If you plan to continue using the RocksDB state backend, make sure to use MapState instead of ValueState where possible, since every access to the ValueState in the RocksDB backend requires serializing / deserializing the whole value.
For MapState, de-/serialization happens per K-V access. Whether or not this makes sense would of course depend on your state access pattern.
 
Then we tried to switch to use FsStateBackend (which keeps the in-flight data in the TaskManager’s memory according to doc), and it could resolve the performance issue. So we want to understand better what are the tradeoffs in choosing between these 2 stateBackend. Our checkpoint size is 200 - 300 GB in stable state. For now we know one benefits of RocksDB is it supports incremental checkpoint, but would love to know what else we are losing in choosing FsStateBackend.

As of now, feature-wise both backends support asynchronous snapshotting, state schema evolution, and access via the State Processor API.
In the end, the major factor for deciding between the two state backends would be your expected state size.
That being said, it could be possible in the future that savepoint formats for the backends are changed to be compatible, meaning that you will be able to switch between different backends upon restore [1].
 

Thanks a lot!
Ran Zhang
Reply | Threaded
Open this post in threaded view
|

Re: FsStateBackend vs RocksDBStateBackend

rmetzger0
In reply to this post by rmetzger0
Sorry for the late reply.

There's not much you can do at the moment, as Flink needs to sync on the checkpoint barriers.
There's something in the making for addressing the issue soon: https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints

Did you try out using the FsStateBackend?
If you are going to stick with rocks, I would recommend to understand what exactly causes the poor performance. I see the following areas:
- serialization costs
- disk / ssd speed
- network speed (during checkpoint creation) (as Yu mentioned)
- if you have asynchronous checkpoints enabled, they will also slow down the processing.


On Sun, Feb 23, 2020 at 8:27 PM Chen Qin <[hidden email]> wrote:
Just follow up on this thread, it accurately caused by key skew. Given single subtask is single threaded 5% of slow processing cause entire job back pressures on rocksdbstatebackend.

Robert,

What is blocking us enable multi threading in processor? I recall it has something todo with barrier and record in order. Can you share more insights on this?

Chen

On Feb 21, 2020, at 4:56 AM, Robert Metzger <[hidden email]> wrote:


I would try the FsStateBackend in this scenario, as you have enough memory available.

On Thu, Jan 30, 2020 at 5:26 PM Ran Zhang <[hidden email]> wrote:
Hi Gordon,

Thanks for your reply! Regarding state size - we are at 200-300gb but we have 120 parallelism which will make each task handle ~2 - 3 gb state. (when we submit the job we are setting tm memory to 15g.) In this scenario what will be the best fit for statebackend? 

Thanks,
Ran

On Wed, Jan 29, 2020 at 6:37 PM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Ran,

On Thu, Jan 30, 2020 at 9:39 AM Ran Zhang <[hidden email]> wrote:
Hi all,

We have a Flink app that uses a KeyedProcessFunction, and in the function it requires a ValueState(of TreeSet) and the processElement method needs to access and update it. We tried to use RocksDB as our stateBackend but the performance is not good, and intuitively we think it was because of the serialization / deserialization on each processElement call.

As you have already pointed out, serialization behaviour is a major difference between the 2 state backends, and will directly impact performance due to the extra runtime overhead in RocksDB.
If you plan to continue using the RocksDB state backend, make sure to use MapState instead of ValueState where possible, since every access to the ValueState in the RocksDB backend requires serializing / deserializing the whole value.
For MapState, de-/serialization happens per K-V access. Whether or not this makes sense would of course depend on your state access pattern.
 
Then we tried to switch to use FsStateBackend (which keeps the in-flight data in the TaskManager’s memory according to doc), and it could resolve the performance issue. So we want to understand better what are the tradeoffs in choosing between these 2 stateBackend. Our checkpoint size is 200 - 300 GB in stable state. For now we know one benefits of RocksDB is it supports incremental checkpoint, but would love to know what else we are losing in choosing FsStateBackend.

As of now, feature-wise both backends support asynchronous snapshotting, state schema evolution, and access via the State Processor API.
In the end, the major factor for deciding between the two state backends would be your expected state size.
That being said, it could be possible in the future that savepoint formats for the backends are changed to be compatible, meaning that you will be able to switch between different backends upon restore [1].
 

Thanks a lot!
Ran Zhang