State Storage Questions

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

State Storage Questions

Rex Fenley
Hello!

I've been digging into State Storage documentation, but it's left me scratching my head with a few questions. Any help will be much appreciated.

Qs:
1. Is there a way to use RocksDB state backend for Flink on AWS EMR? Possibly with S3 backed savepoints for recovery (or maybe hdfs for savepoints?)? Only documentation related to AWS I can find makes it look like AWS must use the S3 File System state backend and not RocksDB at all. https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/s3.html

2. Does the FS state backend not compact? I thought everything in Flink was stored as key/value. In which case, why would the last n values for a key need to stick around, or how would they?
> An incremental checkpoint builds upon (typically multiple) previous checkpoints. Flink leverages RocksDB’s internal compaction mechanism in a way that is self-consolidating over time. As a result, the incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and pruned automatically.

3. In the docs, Operators are referred to as non-keyed state, yet, Operators have IDs that they are keyed by, so why are they referred to as non-keyed state? https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids

4. For the Table API / SQL are primary keys and join keys automatically used as the keys for state under the hood?

Lastly
5. Is there a way to estimate roughly how much disk space state storage will take per operation?

Thanks again!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: State Storage Questions

Tzu-Li (Gordon) Tai
Hi,

On Fri, Sep 4, 2020 at 1:37 PM Rex Fenley <[hidden email]> wrote:
Hello!

I've been digging into State Storage documentation, but it's left me scratching my head with a few questions. Any help will be much appreciated.

Qs:
1. Is there a way to use RocksDB state backend for Flink on AWS EMR? Possibly with S3 backed savepoints for recovery (or maybe hdfs for savepoints?)? Only documentation related to AWS I can find makes it look like AWS must use the S3 File System state backend and not RocksDB at all. https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/s3.html

I think there's some misunderstanding of the role of RocksDB vs filesystems for fault-tolerance here.
RocksDB is a state backend option that manages user state out-of-core, and is managed by the Flink runtime. Users do not need to separately manage RocksDB instances.
For persistence of that state as checkpoints / savepoints for fault-tolerance, you may choose the commonly used filesystems like S3 / HDFS.

See [1] for how to configure your job to use RocksDBStateBackend as the runtime state backend and configuring a filesystem path for persistence.
 

2. Does the FS state backend not compact? I thought everything in Flink was stored as key/value. In which case, why would the last n values for a key need to stick around, or how would they?
> An incremental checkpoint builds upon (typically multiple) previous checkpoints. Flink leverages RocksDB’s internal compaction mechanism in a way that is self-consolidating over time. As a result, the incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and pruned automatically.


The sentence that you quote simply states how Flink leverages RocksDB's background compaction of SSTables to ensure that incremental checkpoints don't grow indefinitely in size.
This has nothing to do with the FsStateBackend, as incremental checkpointing isn't supported there.

Just as a clarification as there might be some other misunderstanding here:
The difference between FsStateBackend v.s. RocksDBStateBackend is the state backend being used to maintain local state at runtime.
RocksDBStateBackend obviously uses RocksDB, while the FsStateBackend uses in-memory hash maps. For persistence, both are checkpointed to a filesystem for fault-tolerance.
The naming may be a bit confusing, so just wanted to clarify that here in case that may have caused any confusion with the questions above.
 
3. In the docs, Operators are referred to as non-keyed state, yet, Operators have IDs that they are keyed by, so why are they referred to as non-keyed state? https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids


Operator state is referred to as non-keyed state because it is not co-partitioned with the stream by key and not values are not bound to single key (i.e. when you access keyed state, the access is bound to a single key), and have different schemes for repartitioning when operators are scaled up or down.
The operator IDs you referred to are simply a unique ID to identify the same operators across different executions of the same job. I'm not sure what you mean by "operators have IDs that are keyed by"; those IDs are not used in any partitioning operation.

 
4. For the Table API / SQL are primary keys and join keys automatically used as the keys for state under the hood?

Yes.
 

Lastly
5. Is there a way to estimate roughly how much disk space state storage will take per operation?
 
Thanks again!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US


Cheers,
Gordon 
Reply | Threaded
Open this post in threaded view
|

Re: State Storage Questions

Rex Fenley
This is so helpful, thank you!

So just to clarify (3), Operator state has a partitioning scheme, but it's simply not by key, it's something else that's special under-the-hood? In which case, what data is stored in an Operator? I assumed it must be the input data for e.g. a join, so that it can react efficiently to any data changes in the stream and recombine only what has actually changed. Is this correct?

On Fri, Sep 4, 2020 at 1:20 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

On Fri, Sep 4, 2020 at 1:37 PM Rex Fenley <[hidden email]> wrote:
Hello!

I've been digging into State Storage documentation, but it's left me scratching my head with a few questions. Any help will be much appreciated.

Qs:
1. Is there a way to use RocksDB state backend for Flink on AWS EMR? Possibly with S3 backed savepoints for recovery (or maybe hdfs for savepoints?)? Only documentation related to AWS I can find makes it look like AWS must use the S3 File System state backend and not RocksDB at all. https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/s3.html

I think there's some misunderstanding of the role of RocksDB vs filesystems for fault-tolerance here.
RocksDB is a state backend option that manages user state out-of-core, and is managed by the Flink runtime. Users do not need to separately manage RocksDB instances.
For persistence of that state as checkpoints / savepoints for fault-tolerance, you may choose the commonly used filesystems like S3 / HDFS.

See [1] for how to configure your job to use RocksDBStateBackend as the runtime state backend and configuring a filesystem path for persistence.
 

2. Does the FS state backend not compact? I thought everything in Flink was stored as key/value. In which case, why would the last n values for a key need to stick around, or how would they?
> An incremental checkpoint builds upon (typically multiple) previous checkpoints. Flink leverages RocksDB’s internal compaction mechanism in a way that is self-consolidating over time. As a result, the incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and pruned automatically.


The sentence that you quote simply states how Flink leverages RocksDB's background compaction of SSTables to ensure that incremental checkpoints don't grow indefinitely in size.
This has nothing to do with the FsStateBackend, as incremental checkpointing isn't supported there.

Just as a clarification as there might be some other misunderstanding here:
The difference between FsStateBackend v.s. RocksDBStateBackend is the state backend being used to maintain local state at runtime.
RocksDBStateBackend obviously uses RocksDB, while the FsStateBackend uses in-memory hash maps. For persistence, both are checkpointed to a filesystem for fault-tolerance.
The naming may be a bit confusing, so just wanted to clarify that here in case that may have caused any confusion with the questions above.
 
3. In the docs, Operators are referred to as non-keyed state, yet, Operators have IDs that they are keyed by, so why are they referred to as non-keyed state? https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids


Operator state is referred to as non-keyed state because it is not co-partitioned with the stream by key and not values are not bound to single key (i.e. when you access keyed state, the access is bound to a single key), and have different schemes for repartitioning when operators are scaled up or down.
The operator IDs you referred to are simply a unique ID to identify the same operators across different executions of the same job. I'm not sure what you mean by "operators have IDs that are keyed by"; those IDs are not used in any partitioning operation.

 
4. For the Table API / SQL are primary keys and join keys automatically used as the keys for state under the hood?

Yes.
 

Lastly
5. Is there a way to estimate roughly how much disk space state storage will take per operation?
 
Thanks again!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US


Cheers,
Gordon 


--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: State Storage Questions

Tzu-Li (Gordon) Tai
Hi!

Operator state is bound to a single parallel operator instance; there is no partitioning happening here.
It is typically used in Flink source and sink operators. For example, the Flink Kafka source operator's parallel instances maintain as operator state a mapping of partitions to offsets for the partitions that it is assigned to. For state like these, it isn't partitionable by any key associated with an input DataStream.

Since there is no partitioning scheme, redistribution of the state on operator rescale also happens differently compared to keyed state.
Take for example a ListState; in contrast to a keyed ListState, an Operator ListState is a collection of state items that are independent from each other and eligible for redistribution across operator instances in the event of a rescale (by default, Flink uses simple round-robin for the redistribution).
In other words, the list entries are the finest granularity at which the operator state can be redistributed, and should not be correlated with each other since each entry of the list may end up in different parallel operator instances on rescale.

In general, there should rarely be a need to use operator state for typical user applications. It isn't massively scalable and usually is small in size.

Cheers,
Gordon

On Sat, Sep 5, 2020 at 12:26 AM Rex Fenley <[hidden email]> wrote:
This is so helpful, thank you!

So just to clarify (3), Operator state has a partitioning scheme, but it's simply not by key, it's something else that's special under-the-hood? In which case, what data is stored in an Operator? I assumed it must be the input data for e.g. a join, so that it can react efficiently to any data changes in the stream and recombine only what has actually changed. Is this correct?

On Fri, Sep 4, 2020 at 1:20 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

On Fri, Sep 4, 2020 at 1:37 PM Rex Fenley <[hidden email]> wrote:
Hello!

I've been digging into State Storage documentation, but it's left me scratching my head with a few questions. Any help will be much appreciated.

Qs:
1. Is there a way to use RocksDB state backend for Flink on AWS EMR? Possibly with S3 backed savepoints for recovery (or maybe hdfs for savepoints?)? Only documentation related to AWS I can find makes it look like AWS must use the S3 File System state backend and not RocksDB at all. https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/s3.html

I think there's some misunderstanding of the role of RocksDB vs filesystems for fault-tolerance here.
RocksDB is a state backend option that manages user state out-of-core, and is managed by the Flink runtime. Users do not need to separately manage RocksDB instances.
For persistence of that state as checkpoints / savepoints for fault-tolerance, you may choose the commonly used filesystems like S3 / HDFS.

See [1] for how to configure your job to use RocksDBStateBackend as the runtime state backend and configuring a filesystem path for persistence.
 

2. Does the FS state backend not compact? I thought everything in Flink was stored as key/value. In which case, why would the last n values for a key need to stick around, or how would they?
> An incremental checkpoint builds upon (typically multiple) previous checkpoints. Flink leverages RocksDB’s internal compaction mechanism in a way that is self-consolidating over time. As a result, the incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and pruned automatically.


The sentence that you quote simply states how Flink leverages RocksDB's background compaction of SSTables to ensure that incremental checkpoints don't grow indefinitely in size.
This has nothing to do with the FsStateBackend, as incremental checkpointing isn't supported there.

Just as a clarification as there might be some other misunderstanding here:
The difference between FsStateBackend v.s. RocksDBStateBackend is the state backend being used to maintain local state at runtime.
RocksDBStateBackend obviously uses RocksDB, while the FsStateBackend uses in-memory hash maps. For persistence, both are checkpointed to a filesystem for fault-tolerance.
The naming may be a bit confusing, so just wanted to clarify that here in case that may have caused any confusion with the questions above.
 
3. In the docs, Operators are referred to as non-keyed state, yet, Operators have IDs that they are keyed by, so why are they referred to as non-keyed state? https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids


Operator state is referred to as non-keyed state because it is not co-partitioned with the stream by key and not values are not bound to single key (i.e. when you access keyed state, the access is bound to a single key), and have different schemes for repartitioning when operators are scaled up or down.
The operator IDs you referred to are simply a unique ID to identify the same operators across different executions of the same job. I'm not sure what you mean by "operators have IDs that are keyed by"; those IDs are not used in any partitioning operation.

 
4. For the Table API / SQL are primary keys and join keys automatically used as the keys for state under the hood?

Yes.
 

Lastly
5. Is there a way to estimate roughly how much disk space state storage will take per operation?
 
Thanks again!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US


Cheers,
Gordon 


--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: State Storage Questions

Rex Fenley
Thanks a bunch!

>For example, the Flink Kafka source operator's parallel instances maintain as operator state a mapping of partitions to offsets for the partitions that it is assigned to.

This I think clarifies things. This is literally state for the operator to do its job, not really row data. The Table API/SQL will use "Keyed State" for rows entirely separately.

Thanks!

On Mon, Sep 7, 2020 at 11:51 PM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi!

Operator state is bound to a single parallel operator instance; there is no partitioning happening here.
It is typically used in Flink source and sink operators. For example, the Flink Kafka source operator's parallel instances maintain as operator state a mapping of partitions to offsets for the partitions that it is assigned to. For state like these, it isn't partitionable by any key associated with an input DataStream.

Since there is no partitioning scheme, redistribution of the state on operator rescale also happens differently compared to keyed state.
Take for example a ListState; in contrast to a keyed ListState, an Operator ListState is a collection of state items that are independent from each other and eligible for redistribution across operator instances in the event of a rescale (by default, Flink uses simple round-robin for the redistribution).
In other words, the list entries are the finest granularity at which the operator state can be redistributed, and should not be correlated with each other since each entry of the list may end up in different parallel operator instances on rescale.

In general, there should rarely be a need to use operator state for typical user applications. It isn't massively scalable and usually is small in size.

Cheers,
Gordon

On Sat, Sep 5, 2020 at 12:26 AM Rex Fenley <[hidden email]> wrote:
This is so helpful, thank you!

So just to clarify (3), Operator state has a partitioning scheme, but it's simply not by key, it's something else that's special under-the-hood? In which case, what data is stored in an Operator? I assumed it must be the input data for e.g. a join, so that it can react efficiently to any data changes in the stream and recombine only what has actually changed. Is this correct?

On Fri, Sep 4, 2020 at 1:20 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

On Fri, Sep 4, 2020 at 1:37 PM Rex Fenley <[hidden email]> wrote:
Hello!

I've been digging into State Storage documentation, but it's left me scratching my head with a few questions. Any help will be much appreciated.

Qs:
1. Is there a way to use RocksDB state backend for Flink on AWS EMR? Possibly with S3 backed savepoints for recovery (or maybe hdfs for savepoints?)? Only documentation related to AWS I can find makes it look like AWS must use the S3 File System state backend and not RocksDB at all. https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/s3.html

I think there's some misunderstanding of the role of RocksDB vs filesystems for fault-tolerance here.
RocksDB is a state backend option that manages user state out-of-core, and is managed by the Flink runtime. Users do not need to separately manage RocksDB instances.
For persistence of that state as checkpoints / savepoints for fault-tolerance, you may choose the commonly used filesystems like S3 / HDFS.

See [1] for how to configure your job to use RocksDBStateBackend as the runtime state backend and configuring a filesystem path for persistence.
 

2. Does the FS state backend not compact? I thought everything in Flink was stored as key/value. In which case, why would the last n values for a key need to stick around, or how would they?
> An incremental checkpoint builds upon (typically multiple) previous checkpoints. Flink leverages RocksDB’s internal compaction mechanism in a way that is self-consolidating over time. As a result, the incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and pruned automatically.


The sentence that you quote simply states how Flink leverages RocksDB's background compaction of SSTables to ensure that incremental checkpoints don't grow indefinitely in size.
This has nothing to do with the FsStateBackend, as incremental checkpointing isn't supported there.

Just as a clarification as there might be some other misunderstanding here:
The difference between FsStateBackend v.s. RocksDBStateBackend is the state backend being used to maintain local state at runtime.
RocksDBStateBackend obviously uses RocksDB, while the FsStateBackend uses in-memory hash maps. For persistence, both are checkpointed to a filesystem for fault-tolerance.
The naming may be a bit confusing, so just wanted to clarify that here in case that may have caused any confusion with the questions above.
 
3. In the docs, Operators are referred to as non-keyed state, yet, Operators have IDs that they are keyed by, so why are they referred to as non-keyed state? https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids


Operator state is referred to as non-keyed state because it is not co-partitioned with the stream by key and not values are not bound to single key (i.e. when you access keyed state, the access is bound to a single key), and have different schemes for repartitioning when operators are scaled up or down.
The operator IDs you referred to are simply a unique ID to identify the same operators across different executions of the same job. I'm not sure what you mean by "operators have IDs that are keyed by"; those IDs are not used in any partitioning operation.

 
4. For the Table API / SQL are primary keys and join keys automatically used as the keys for state under the hood?

Yes.
 

Lastly
5. Is there a way to estimate roughly how much disk space state storage will take per operation?
 
Thanks again!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US


Cheers,
Gordon 


--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US