Flink long state TTL Concerns

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink long state TTL Concerns

Matt Magsombol
Suppose I'm using state stored in-memory that has a TTL of 7 days max. Should I run into any issues with state this long other than potential OOM?

Let's suppose I extend this such that we add rocksdb...any concerns with this with respect to maintenance?

Most of the examples that I've been seeing seem to pair state with timewindows but I'll only read from this state every 15 seconds ( or some small timewindow ). After each timewindow, I *won't* be cleaning up the data within the state b/c I'll need to re-lookup from this state on future time windows. I'll effectively rely on TTL based on key expiration time and I was wondering what potential issues I should watch out for this.
Reply | Threaded
Open this post in threaded view
|

Re: Flink long state TTL Concerns

Andrey Zagrebin-5
Hi Matt,

Generally speaking, using state with TTL in Flink should not differ a lot from just using Flink with state [1].
You have to provision your system so that it can keep the state of size which is worth of 7 days.

The existing Flink state backends provide background cleanup to automatically remove the expired state eventually,
so that your application does not need to do any explicit access of the expired state to clean it.
The background cleanup is active by default since Flink 1.10 [2].

Enabling TTL for state, of course, comes for price because you need to store timestamp and spend CPU cycles for the background cleanup.
This affects storage size and potentially processing latency per record.
You can read about details and caveats in the docs: for heap state [3] and RocksDB [4].

Best,
Andrey


On Thu, Mar 19, 2020 at 6:48 PM Matt Magsombol <[hidden email]> wrote:
Suppose I'm using state stored in-memory that has a TTL of 7 days max. Should I run into any issues with state this long other than potential OOM?

Let's suppose I extend this such that we add rocksdb...any concerns with this with respect to maintenance?

Most of the examples that I've been seeing seem to pair state with timewindows but I'll only read from this state every 15 seconds ( or some small timewindow ). After each timewindow, I *won't* be cleaning up the data within the state b/c I'll need to re-lookup from this state on future time windows. I'll effectively rely on TTL based on key expiration time and I was wondering what potential issues I should watch out for this.
Reply | Threaded
Open this post in threaded view
|

Re: Flink long state TTL Concerns

Matt Magsombol
I see...
The way we run our setup is that we run these in a kubernetes cluster where we have one cluster running one job.
The total parallelism of the whole cluster is equal to the number of taskmanagers where each task manager has 1 core cpu accounting for 1 slot.
If we add a state ttl, do you have any recommendation as to how much I should bump the cpu per task manager? 2 cores per task manager with 1 slot per task manager ( and the other cpu core will be used for TTL state cleanup? ).
Or is that overkill?

On Thu, Mar 19, 2020 at 12:56 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Matt,

Generally speaking, using state with TTL in Flink should not differ a lot from just using Flink with state [1].
You have to provision your system so that it can keep the state of size which is worth of 7 days.

The existing Flink state backends provide background cleanup to automatically remove the expired state eventually,
so that your application does not need to do any explicit access of the expired state to clean it.
The background cleanup is active by default since Flink 1.10 [2].

Enabling TTL for state, of course, comes for price because you need to store timestamp and spend CPU cycles for the background cleanup.
This affects storage size and potentially processing latency per record.
You can read about details and caveats in the docs: for heap state [3] and RocksDB [4].

Best,
Andrey


On Thu, Mar 19, 2020 at 6:48 PM Matt Magsombol <[hidden email]> wrote:
Suppose I'm using state stored in-memory that has a TTL of 7 days max. Should I run into any issues with state this long other than potential OOM?

Let's suppose I extend this such that we add rocksdb...any concerns with this with respect to maintenance?

Most of the examples that I've been seeing seem to pair state with timewindows but I'll only read from this state every 15 seconds ( or some small timewindow ). After each timewindow, I *won't* be cleaning up the data within the state b/c I'll need to re-lookup from this state on future time windows. I'll effectively rely on TTL based on key expiration time and I was wondering what potential issues I should watch out for this.
Reply | Threaded
Open this post in threaded view
|

Re: Flink long state TTL Concerns

Matt Magsombol
Also as a follow up question with respect to state cleanup,
I see that there's an incremental cleanup option: 
It has notes indicating that if no access happens to that state/no records processed, then that expired state persists...

So just for clarification, let's suppose I have a key named "A". "A" has a Ttl of 1 hour and "A" was last updated at the 30th minute.
By the time I hit any time after the 1 hour'th mark, if I don't receive any message that has key "A" and try to query it from state, does that mean that
"A" will just hang around? Or will it eventually get cleaned up?

I see that for rocksdb, it runs an async compaction. In this scenario, key "A" will eventually be cleaned up even if we don't access and update it after that 1 hour TTL right?

Yeah, I just want to make sure that for state where the last time I've updated them was probably earlier on than the TTL and never updated, I want to make sure that those keys are
eventually cleaned up without having to "read" from them. It sounds like rocksdb cleans these up via compaction but what about for states where we use FSBackendState where we use the heap for in-flight data?

On Thu, Mar 19, 2020 at 7:07 PM Matthew Rafael Magsombol <[hidden email]> wrote:
I see...
The way we run our setup is that we run these in a kubernetes cluster where we have one cluster running one job.
The total parallelism of the whole cluster is equal to the number of taskmanagers where each task manager has 1 core cpu accounting for 1 slot.
If we add a state ttl, do you have any recommendation as to how much I should bump the cpu per task manager? 2 cores per task manager with 1 slot per task manager ( and the other cpu core will be used for TTL state cleanup? ).
Or is that overkill?

On Thu, Mar 19, 2020 at 12:56 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Matt,

Generally speaking, using state with TTL in Flink should not differ a lot from just using Flink with state [1].
You have to provision your system so that it can keep the state of size which is worth of 7 days.

The existing Flink state backends provide background cleanup to automatically remove the expired state eventually,
so that your application does not need to do any explicit access of the expired state to clean it.
The background cleanup is active by default since Flink 1.10 [2].

Enabling TTL for state, of course, comes for price because you need to store timestamp and spend CPU cycles for the background cleanup.
This affects storage size and potentially processing latency per record.
You can read about details and caveats in the docs: for heap state [3] and RocksDB [4].

Best,
Andrey


On Thu, Mar 19, 2020 at 6:48 PM Matt Magsombol <[hidden email]> wrote:
Suppose I'm using state stored in-memory that has a TTL of 7 days max. Should I run into any issues with state this long other than potential OOM?

Let's suppose I extend this such that we add rocksdb...any concerns with this with respect to maintenance?

Most of the examples that I've been seeing seem to pair state with timewindows but I'll only read from this state every 15 seconds ( or some small timewindow ). After each timewindow, I *won't* be cleaning up the data within the state b/c I'll need to re-lookup from this state on future time windows. I'll effectively rely on TTL based on key expiration time and I was wondering what potential issues I should watch out for this.
Reply | Threaded
Open this post in threaded view
|

Re: Flink long state TTL Concerns

Matt Magsombol
And another additional followup!
( Sorry if there's a lot of followups! We've ran a flink consumer but these are very basic consumers without state! ).

Suppose I want to use a MapState[String, <Thrift_object>]...
in order to make that happen, following this link https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html should suffice right?
And let's suppose after this state access, I want to emit a variant of this <thrift_object> to the next operator...then I presume I need to let flink know how to serialize this message via kryoSerializer: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/custom_serializers.html ?

( this is a bit off topic by this point now! )

On Thu, Mar 19, 2020 at 11:25 PM Matthew Rafael Magsombol <[hidden email]> wrote:
Also as a follow up question with respect to state cleanup,
I see that there's an incremental cleanup option: 
It has notes indicating that if no access happens to that state/no records processed, then that expired state persists...

So just for clarification, let's suppose I have a key named "A". "A" has a Ttl of 1 hour and "A" was last updated at the 30th minute.
By the time I hit any time after the 1 hour'th mark, if I don't receive any message that has key "A" and try to query it from state, does that mean that
"A" will just hang around? Or will it eventually get cleaned up?

I see that for rocksdb, it runs an async compaction. In this scenario, key "A" will eventually be cleaned up even if we don't access and update it after that 1 hour TTL right?

Yeah, I just want to make sure that for state where the last time I've updated them was probably earlier on than the TTL and never updated, I want to make sure that those keys are
eventually cleaned up without having to "read" from them. It sounds like rocksdb cleans these up via compaction but what about for states where we use FSBackendState where we use the heap for in-flight data?

On Thu, Mar 19, 2020 at 7:07 PM Matthew Rafael Magsombol <[hidden email]> wrote:
I see...
The way we run our setup is that we run these in a kubernetes cluster where we have one cluster running one job.
The total parallelism of the whole cluster is equal to the number of taskmanagers where each task manager has 1 core cpu accounting for 1 slot.
If we add a state ttl, do you have any recommendation as to how much I should bump the cpu per task manager? 2 cores per task manager with 1 slot per task manager ( and the other cpu core will be used for TTL state cleanup? ).
Or is that overkill?

On Thu, Mar 19, 2020 at 12:56 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Matt,

Generally speaking, using state with TTL in Flink should not differ a lot from just using Flink with state [1].
You have to provision your system so that it can keep the state of size which is worth of 7 days.

The existing Flink state backends provide background cleanup to automatically remove the expired state eventually,
so that your application does not need to do any explicit access of the expired state to clean it.
The background cleanup is active by default since Flink 1.10 [2].

Enabling TTL for state, of course, comes for price because you need to store timestamp and spend CPU cycles for the background cleanup.
This affects storage size and potentially processing latency per record.
You can read about details and caveats in the docs: for heap state [3] and RocksDB [4].

Best,
Andrey


On Thu, Mar 19, 2020 at 6:48 PM Matt Magsombol <[hidden email]> wrote:
Suppose I'm using state stored in-memory that has a TTL of 7 days max. Should I run into any issues with state this long other than potential OOM?

Let's suppose I extend this such that we add rocksdb...any concerns with this with respect to maintenance?

Most of the examples that I've been seeing seem to pair state with timewindows but I'll only read from this state every 15 seconds ( or some small timewindow ). After each timewindow, I *won't* be cleaning up the data within the state b/c I'll need to re-lookup from this state on future time windows. I'll effectively rely on TTL based on key expiration time and I was wondering what potential issues I should watch out for this.
Reply | Threaded
Open this post in threaded view
|

Re: Flink long state TTL Concerns

Andrey Zagrebin-5
Resources:

If you use heap state backend, the cleanup happens while processing records in the same thread so there is direct connection with the number of cores.
If you use RocksDB state backend, extra cpus can be used by async compaction and should speed up the background cleanup.

Incremental cleanup semantics:

> if no access happens to that state
This means no access to any key. Practically no data hits the pipeline and no processing happening. As mentioned, heap cleanup happens while processing records or accessing any state key, not necessary the key which needs cleanup.

>  I want to make sure that those keys are eventually cleaned up without having to "read" from them.
I think this should be ok for both backends. Just for heap, if the pipeline stalls then the cleanup is on hold as well.

Serialization

I am not a deep expert here and not sure I fully follow the problem.
I would suggest to start another thread with more details to make it visible for other people as well.

In general, what you see in docs about serializers is important for state as well.
So you have to be aware which object you keep in state and exchange between operators and make sure they are serializable for Flink.
Pay attention to migration if you want to evolve the types of your objects, it is not a trivial topic [1].

Best,
Andrey

On Fri, Mar 20, 2020 at 9:39 AM Matthew Rafael Magsombol <[hidden email]> wrote:
And another additional followup!
( Sorry if there's a lot of followups! We've ran a flink consumer but these are very basic consumers without state! ).

Suppose I want to use a MapState[String, <Thrift_object>]...
in order to make that happen, following this link https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html should suffice right?
And let's suppose after this state access, I want to emit a variant of this <thrift_object> to the next operator...then I presume I need to let flink know how to serialize this message via kryoSerializer: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/custom_serializers.html ?

( this is a bit off topic by this point now! )

On Thu, Mar 19, 2020 at 11:25 PM Matthew Rafael Magsombol <[hidden email]> wrote:
Also as a follow up question with respect to state cleanup,
I see that there's an incremental cleanup option: 
It has notes indicating that if no access happens to that state/no records processed, then that expired state persists...

So just for clarification, let's suppose I have a key named "A". "A" has a Ttl of 1 hour and "A" was last updated at the 30th minute.
By the time I hit any time after the 1 hour'th mark, if I don't receive any message that has key "A" and try to query it from state, does that mean that
"A" will just hang around? Or will it eventually get cleaned up?

I see that for rocksdb, it runs an async compaction. In this scenario, key "A" will eventually be cleaned up even if we don't access and update it after that 1 hour TTL right?

Yeah, I just want to make sure that for state where the last time I've updated them was probably earlier on than the TTL and never updated, I want to make sure that those keys are
eventually cleaned up without having to "read" from them. It sounds like rocksdb cleans these up via compaction but what about for states where we use FSBackendState where we use the heap for in-flight data?

On Thu, Mar 19, 2020 at 7:07 PM Matthew Rafael Magsombol <[hidden email]> wrote:
I see...
The way we run our setup is that we run these in a kubernetes cluster where we have one cluster running one job.
The total parallelism of the whole cluster is equal to the number of taskmanagers where each task manager has 1 core cpu accounting for 1 slot.
If we add a state ttl, do you have any recommendation as to how much I should bump the cpu per task manager? 2 cores per task manager with 1 slot per task manager ( and the other cpu core will be used for TTL state cleanup? ).
Or is that overkill?

On Thu, Mar 19, 2020 at 12:56 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Matt,

Generally speaking, using state with TTL in Flink should not differ a lot from just using Flink with state [1].
You have to provision your system so that it can keep the state of size which is worth of 7 days.

The existing Flink state backends provide background cleanup to automatically remove the expired state eventually,
so that your application does not need to do any explicit access of the expired state to clean it.
The background cleanup is active by default since Flink 1.10 [2].

Enabling TTL for state, of course, comes for price because you need to store timestamp and spend CPU cycles for the background cleanup.
This affects storage size and potentially processing latency per record.
You can read about details and caveats in the docs: for heap state [3] and RocksDB [4].

Best,
Andrey


On Thu, Mar 19, 2020 at 6:48 PM Matt Magsombol <[hidden email]> wrote:
Suppose I'm using state stored in-memory that has a TTL of 7 days max. Should I run into any issues with state this long other than potential OOM?

Let's suppose I extend this such that we add rocksdb...any concerns with this with respect to maintenance?

Most of the examples that I've been seeing seem to pair state with timewindows but I'll only read from this state every 15 seconds ( or some small timewindow ). After each timewindow, I *won't* be cleaning up the data within the state b/c I'll need to re-lookup from this state on future time windows. I'll effectively rely on TTL based on key expiration time and I was wondering what potential issues I should watch out for this.
Reply | Threaded
Open this post in threaded view
|

Re: Flink long state TTL Concerns

Matthew Magsombol
Gotcha, ok
Thanks! I think this is everything I need to know for now!
I can get around using thrift as a state data type by using generic flink data type and upon read, I can convert to thrift data type to pass to my sink.


On Fri, Mar 20, 2020 at 1:15 AM Andrey Zagrebin <[hidden email]> wrote:
Resources:

If you use heap state backend, the cleanup happens while processing records in the same thread so there is direct connection with the number of cores.
If you use RocksDB state backend, extra cpus can be used by async compaction and should speed up the background cleanup.

Incremental cleanup semantics:

> if no access happens to that state
This means no access to any key. Practically no data hits the pipeline and no processing happening. As mentioned, heap cleanup happens while processing records or accessing any state key, not necessary the key which needs cleanup.

>  I want to make sure that those keys are eventually cleaned up without having to "read" from them.
I think this should be ok for both backends. Just for heap, if the pipeline stalls then the cleanup is on hold as well.

Serialization

I am not a deep expert here and not sure I fully follow the problem.
I would suggest to start another thread with more details to make it visible for other people as well.

In general, what you see in docs about serializers is important for state as well.
So you have to be aware which object you keep in state and exchange between operators and make sure they are serializable for Flink.
Pay attention to migration if you want to evolve the types of your objects, it is not a trivial topic [1].

Best,
Andrey

On Fri, Mar 20, 2020 at 9:39 AM Matthew Rafael Magsombol <[hidden email]> wrote:
And another additional followup!
( Sorry if there's a lot of followups! We've ran a flink consumer but these are very basic consumers without state! ).

Suppose I want to use a MapState[String, <Thrift_object>]...
in order to make that happen, following this link https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html should suffice right?
And let's suppose after this state access, I want to emit a variant of this <thrift_object> to the next operator...then I presume I need to let flink know how to serialize this message via kryoSerializer: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/custom_serializers.html ?

( this is a bit off topic by this point now! )

On Thu, Mar 19, 2020 at 11:25 PM Matthew Rafael Magsombol <[hidden email]> wrote:
Also as a follow up question with respect to state cleanup,
I see that there's an incremental cleanup option: 
It has notes indicating that if no access happens to that state/no records processed, then that expired state persists...

So just for clarification, let's suppose I have a key named "A". "A" has a Ttl of 1 hour and "A" was last updated at the 30th minute.
By the time I hit any time after the 1 hour'th mark, if I don't receive any message that has key "A" and try to query it from state, does that mean that
"A" will just hang around? Or will it eventually get cleaned up?

I see that for rocksdb, it runs an async compaction. In this scenario, key "A" will eventually be cleaned up even if we don't access and update it after that 1 hour TTL right?

Yeah, I just want to make sure that for state where the last time I've updated them was probably earlier on than the TTL and never updated, I want to make sure that those keys are
eventually cleaned up without having to "read" from them. It sounds like rocksdb cleans these up via compaction but what about for states where we use FSBackendState where we use the heap for in-flight data?

On Thu, Mar 19, 2020 at 7:07 PM Matthew Rafael Magsombol <[hidden email]> wrote:
I see...
The way we run our setup is that we run these in a kubernetes cluster where we have one cluster running one job.
The total parallelism of the whole cluster is equal to the number of taskmanagers where each task manager has 1 core cpu accounting for 1 slot.
If we add a state ttl, do you have any recommendation as to how much I should bump the cpu per task manager? 2 cores per task manager with 1 slot per task manager ( and the other cpu core will be used for TTL state cleanup? ).
Or is that overkill?

On Thu, Mar 19, 2020 at 12:56 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Matt,

Generally speaking, using state with TTL in Flink should not differ a lot from just using Flink with state [1].
You have to provision your system so that it can keep the state of size which is worth of 7 days.

The existing Flink state backends provide background cleanup to automatically remove the expired state eventually,
so that your application does not need to do any explicit access of the expired state to clean it.
The background cleanup is active by default since Flink 1.10 [2].

Enabling TTL for state, of course, comes for price because you need to store timestamp and spend CPU cycles for the background cleanup.
This affects storage size and potentially processing latency per record.
You can read about details and caveats in the docs: for heap state [3] and RocksDB [4].

Best,
Andrey


On Thu, Mar 19, 2020 at 6:48 PM Matt Magsombol <[hidden email]> wrote:
Suppose I'm using state stored in-memory that has a TTL of 7 days max. Should I run into any issues with state this long other than potential OOM?

Let's suppose I extend this such that we add rocksdb...any concerns with this with respect to maintenance?

Most of the examples that I've been seeing seem to pair state with timewindows but I'll only read from this state every 15 seconds ( or some small timewindow ). After each timewindow, I *won't* be cleaning up the data within the state b/c I'll need to re-lookup from this state on future time windows. I'll effectively rely on TTL based on key expiration time and I was wondering what potential issues I should watch out for this.