Incremental state

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Incremental state

Annemarie Burger
Hi,

What I'm trying to do is the following: I want to incrementally add and
delete elements to a state. If the element expires/goes out of the window,
it needs to be removed from the state. I basically want the functionality of
TTL, without using it, since I'm also using Queryable State and these two
features can't be combined. Ofcourse I can give a "valid untill" time to
each element when I'm adding it to the state using a ProcessFunction, and
periodically iterate over the state to remove expired elements, but I was
wondering if there is a more efficient way. For example to use a timer,
which we give the element as a parameter, so that when the timer fires, x
seconds after the timer was set, it can just look up the element directly
and remove it. But how would I implement this?

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Incremental state

Timo Walther
Hi Annemarie,

if TTL is what you are looking for and queryable state is what limits
you, it might make sense to come up with a custom implementation of
queryable state? TTL might be more difficult to implement. As far as I
know this feature is more of an experimental feature without any
consistency guarantees. A Function could offer this functionality using
some socket/web service library. Or you offer insights through a side
output into a sink such as Elasticsearch.

Otherwise, it might be useful to "batch" the cleanups. In Flink's SQL
engine, a user can define a minimum and maximum retention time. So
timers are always set based on the maximum retention time but during
cleanup the elements that fall into the minimum retention time are also
cleaned up on the way (see [1]). This could be a performance improvement.

If the clean up happens based on event-time, it is also possible to use
timers more efficiently and only set one timer per watermark [2].

I hope this helps.

Regards,
Timo

[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/BaseTwoInputStreamOperatorWithStateRetention.scala

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#timer-coalescing


On 09.06.20 16:29, Annemarie Burger wrote:

> Hi,
>
> What I'm trying to do is the following: I want to incrementally add and
> delete elements to a state. If the element expires/goes out of the window,
> it needs to be removed from the state. I basically want the functionality of
> TTL, without using it, since I'm also using Queryable State and these two
> features can't be combined. Ofcourse I can give a "valid untill" time to
> each element when I'm adding it to the state using a ProcessFunction, and
> periodically iterate over the state to remove expired elements, but I was
> wondering if there is a more efficient way. For example to use a timer,
> which we give the element as a parameter, so that when the timer fires, x
> seconds after the timer was set, it can just look up the element directly
> and remove it. But how would I implement this?
>
> Thanks!
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Reply | Threaded
Open this post in threaded view
|

Re: Incremental state

Congxian Qiu
Hi 

Can process function[1] can meet your needs here?, you can do the TTL logic using timers in process functions.


Timo Walther <[hidden email]> 于2020年6月10日周三 下午9:36写道:
Hi Annemarie,

if TTL is what you are looking for and queryable state is what limits
you, it might make sense to come up with a custom implementation of
queryable state? TTL might be more difficult to implement. As far as I
know this feature is more of an experimental feature without any
consistency guarantees. A Function could offer this functionality using
some socket/web service library. Or you offer insights through a side
output into a sink such as Elasticsearch.

Otherwise, it might be useful to "batch" the cleanups. In Flink's SQL
engine, a user can define a minimum and maximum retention time. So
timers are always set based on the maximum retention time but during
cleanup the elements that fall into the minimum retention time are also
cleaned up on the way (see [1]). This could be a performance improvement.

If the clean up happens based on event-time, it is also possible to use
timers more efficiently and only set one timer per watermark [2].

I hope this helps.

Regards,
Timo

[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/BaseTwoInputStreamOperatorWithStateRetention.scala

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#timer-coalescing


On 09.06.20 16:29, Annemarie Burger wrote:
> Hi,
>
> What I'm trying to do is the following: I want to incrementally add and
> delete elements to a state. If the element expires/goes out of the window,
> it needs to be removed from the state. I basically want the functionality of
> TTL, without using it, since I'm also using Queryable State and these two
> features can't be combined. Ofcourse I can give a "valid untill" time to
> each element when I'm adding it to the state using a ProcessFunction, and
> periodically iterate over the state to remove expired elements, but I was
> wondering if there is a more efficient way. For example to use a timer,
> which we give the element as a parameter, so that when the timer fires, x
> seconds after the timer was set, it can just look up the element directly
> and remove it. But how would I implement this?
>
> Thanks!
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>