Migrating Existing TTL State to 1.8

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Migrating Existing TTL State to 1.8

Ning Shi
It's exciting to see TTL state cleanup feature in 1.8. I have a question regarding the migration of existing TTL state to the newer version.

Looking at the Pull Request [1] that introduced this feature, it seems like that Flink is leveraging RocksDB's compaction filter to remove stale state. I assume this means that state will only be cleaned on compaction. If I have a significant amount of stale TTL state, some of which may have already been compacted to higher levels already, upgrading to 1.8 may not clean them. Is this assumption correct? If so, is the best approach to take a full snapshot/checkpoint and restore it to 1.8 to have them been cleaned on initialization?

Thanks,


--
Ning
Reply | Threaded
Open this post in threaded view
|

Re: Migrating Existing TTL State to 1.8

Ning Shi
Just wondering if anyone has any insights into the new TTL state cleanup feature mentioned below.

Thanks,

Ning

On Mar 11, 2019, at 1:15 PM, Ning Shi <[hidden email]> wrote:

It's exciting to see TTL state cleanup feature in 1.8. I have a question regarding the migration of existing TTL state to the newer version.

Looking at the Pull Request [1] that introduced this feature, it seems like that Flink is leveraging RocksDB's compaction filter to remove stale state. I assume this means that state will only be cleaned on compaction. If I have a significant amount of stale TTL state, some of which may have already been compacted to higher levels already, upgrading to 1.8 may not clean them. Is this assumption correct? If so, is the best approach to take a full snapshot/checkpoint and restore it to 1.8 to have them been cleaned on initialization?

Thanks,


--
Ning
Reply | Threaded
Open this post in threaded view
|

Re: Migrating Existing TTL State to 1.8

Stefan Richter-4
Hi,

If you are worried about old state, you can combine the compaction filter based TTL with other cleanup strategies (see docs). For example, setting `cleanupFullSnapshot` when you take a savepoint it will be cleared of any expired state and you can then use it to bring it into Flink 1.8.

Best,
Stefan

On 13. Mar 2019, at 13:41, Ning Shi <[hidden email]> wrote:

Just wondering if anyone has any insights into the new TTL state cleanup feature mentioned below.

Thanks,

Ning

On Mar 11, 2019, at 1:15 PM, Ning Shi <[hidden email]> wrote:

It's exciting to see TTL state cleanup feature in 1.8. I have a question regarding the migration of existing TTL state to the newer version.

Looking at the Pull Request [1] that introduced this feature, it seems like that Flink is leveraging RocksDB's compaction filter to remove stale state. I assume this means that state will only be cleaned on compaction. If I have a significant amount of stale TTL state, some of which may have already been compacted to higher levels already, upgrading to 1.8 may not clean them. Is this assumption correct? If so, is the best approach to take a full snapshot/checkpoint and restore it to 1.8 to have them been cleaned on initialization?

Thanks,


--
Ning

Reply | Threaded
Open this post in threaded view
|

Re: Migrating Existing TTL State to 1.8

Ning Shi
Hi Stefan,

Thank you for the confirmation.

Doing a one time cleanup with full snapshot and upgrading to Flink 1.8
could work. However, in our case, the state is quite large (TBs).
Taking a savepoint takes over an hour, during which we have to pause
the job or it may process more events.

The JavaDoc of `cleanupFullSnapshot` [1] says "Cleanup expired state
in full snapshot on checkpoint.". My understanding is that the only
way to take a full snapshot with RocksDB backend is to take a
savepoint. Is there another way to take a full checkpoint?

I noticed that Flink 1.8 also added an incremental cleanup strategy
[2] by iterating through several keys at a time for each state access.
If I combine this with the new compaction filter cleanup strategy,
will it eventually remove all expired state without taking a full
snapshot for upgrade?

[1] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/state/StateTtlConfig.Builder.html#cleanupFullSnapshot--
[2] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/state/StateTtlConfig.Builder.html#cleanupIncrementally-int-boolean-

Thanks,

Ning


On Wed, Mar 13, 2019 at 11:22 AM Stefan Richter <[hidden email]> wrote:
>
> Hi,
>
> If you are worried about old state, you can combine the compaction filter based TTL with other cleanup strategies (see docs). For example, setting `cleanupFullSnapshot` when you take a savepoint it will be cleared of any expired state and you can then use it to bring it into Flink 1.8.
>
> Best,
> Stefan
Reply | Threaded
Open this post in threaded view
|

Re: Migrating Existing TTL State to 1.8

Andrey Zagrebin-3
Hi Ning,

If you have not activated non-incremental checkpointing then taking a savepoint is the only way to trigger the full snapshot. In any case, it will take time.

The incremental cleanup strategy is applicable only for heap state backend and does nothing for RocksDB backend. At the moment, you can combine only compaction filter with full snapshotting cleanup with RocksDB backend.

Best,
Andrey

On Fri, Mar 15, 2019 at 11:56 PM Ning Shi <[hidden email]> wrote:
Hi Stefan,

Thank you for the confirmation.

Doing a one time cleanup with full snapshot and upgrading to Flink 1.8
could work. However, in our case, the state is quite large (TBs).
Taking a savepoint takes over an hour, during which we have to pause
the job or it may process more events.

The JavaDoc of `cleanupFullSnapshot` [1] says "Cleanup expired state
in full snapshot on checkpoint.". My understanding is that the only
way to take a full snapshot with RocksDB backend is to take a
savepoint. Is there another way to take a full checkpoint?

I noticed that Flink 1.8 also added an incremental cleanup strategy
[2] by iterating through several keys at a time for each state access.
If I combine this with the new compaction filter cleanup strategy,
will it eventually remove all expired state without taking a full
snapshot for upgrade?

[1] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/state/StateTtlConfig.Builder.html#cleanupFullSnapshot--
[2] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/state/StateTtlConfig.Builder.html#cleanupIncrementally-int-boolean-

Thanks,

Ning


On Wed, Mar 13, 2019 at 11:22 AM Stefan Richter <[hidden email]> wrote:
>
> Hi,
>
> If you are worried about old state, you can combine the compaction filter based TTL with other cleanup strategies (see docs). For example, setting `cleanupFullSnapshot` when you take a savepoint it will be cleared of any expired state and you can then use it to bring it into Flink 1.8.
>
> Best,
> Stefan
Reply | Threaded
Open this post in threaded view
|

Re: Migrating Existing TTL State to 1.8

Ning Shi
Hi Andrey,

Thank you for the reply.

We are using incremental checkpointing.

Good to know that the incremental cleanup only applies to the heap state
backend. Looks like taking some downtime to take a full savepoint and restore
everything is inevitable.

Thanks,

--
Ning

On Wed, 15 May 2019 10:53:25 -0400,
Andrey Zagrebin wrote:

>
> Hi Ning,
>
> If you have not activated non-incremental checkpointing then taking a
> savepoint is the only way to trigger the full snapshot. In any case, it
> will take time.
>
> The incremental cleanup strategy is applicable only for heap state backend
> and does nothing for RocksDB backend. At the moment, you can combine only
> compaction filter with full snapshotting cleanup with RocksDB backend.
>
> Best,
> Andrey