timeWindow()s and queryable state

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

timeWindow()s and queryable state

Ron Crocker
Hi all -

I’m trying to keep some state around for a little while after a window fires to use as queryable state. I am intending on using something like:

.keyBy(<aggregation key, no explicit time component>)
.timeWindow(Time.minutes(1)).allowedLateness(Time.minutes(90))
.aggregate(…)
.keyBy(<query key, includes a time component>)
.asQueryableState(...)

My intent is to keep that window available for 90 minutes. I’m not sure how I feel about this pattern - it feels more side-effect-y than intentional.

My questions:
a) Is that actually going to keep the window (and, by implication, the downstream state) around?
b) Is there a “more correct” way to do this? Maybe it would be better to use some kind of time-aware reducing state that will provide some lingering state?

Before you ask, no, I haven’t run it to see what it does. That’s next, but I figured I’d ask for your advice first
Reply | Threaded
Open this post in threaded view
|

Re: timeWindow()s and queryable state

Dawid Wysakowicz-2

Hey Ron,

I am pretty sure the queryable state will not do any pruning. It will keep the state for all windows seen so far. The allowedLateness applies to the window computation not the queryable state part. The `asQueryableState` will create a downstream operator that will keep updating a state with results of the window operator.

If you want to have a more fine grained control over what and how long is kept in a queryable state you can write your own process function with a state that you configure to be queryable via[1]:

ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
desc.setQueryable("vanilla");

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html#managed-keyed-state

On 01/03/2021 17:39, Ron Crocker wrote:
Hi all -

I’m trying to keep some state around for a little while after a window fires to use as queryable state. I am intending on using something like:

.keyBy(<aggregation key, no explicit time component>)
.timeWindow(Time.minutes(1)).allowedLateness(Time.minutes(90))
.aggregate(…)
.keyBy(<query key, includes a time component>)
.asQueryableState(...)

My intent is to keep that window available for 90 minutes. I’m not sure how I feel about this pattern - it feels more side-effect-y than intentional.

My questions:
a) Is that actually going to keep the window (and, by implication, the downstream state) around?
b) Is there a “more correct” way to do this? Maybe it would be better to use some kind of time-aware reducing state that will provide some lingering state?

Before you ask, no, I haven’t run it to see what it does. That’s next, but I figured I’d ask for your advice first

OpenPGP_signature (855 bytes) Download Attachment