Timers and state

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Timers and state

Alberto Mancini
Hello,
in a Flink application we have a keyed operator that keeps a map state (MapState<UK, UV>). Some of the elements in the state need a timeout so we use a Timer.
When the timer is called the state is scoped to the key as expected but we would like to 'pass' (or have available elsewhere) to onTimer the map-key that is expiring.
The actual approach is to visit the whole map and select the correct key keeping in the map-values the expected timestamp of the timer but this seems overkill.
Is there consolidated a better approach we do not see?

Thanks,
       Alberto.

Reply | Threaded
Open this post in threaded view
|

Re: Timers and state

Xingcan Cui
Hi Alberto,

an ultimate solution for your problem would be a map state with ordered keys (like a TreeMap), but unfortunately, this is still a WIP feature. 

For now, maybe you could "eagerly remove” the outdated value (with `iterator.remove()`) when iterating the map state in the process function or split the key space for your map state into static bins, thus you could calculate a set of outdated keys before removing them.

Hope that helps.

Best,
Xingcan

On 5 Mar 2018, at 4:19 PM, Alberto Mancini <[hidden email]> wrote:

 

Reply | Threaded
Open this post in threaded view
|

Re: Timers and state

Fabian Hueske-2
Hi Alberto,

You can also add another MapState<Long, Key>. The key is a timestamps and the value is the key that you want to discard.
When onTimer() is called, you look up the key in the MapState<Long, Key> and and remove it from the original MapState.

Best, Fabian

2018-03-05 0:48 GMT-08:00 Xingcan Cui <[hidden email]>:
Hi Alberto,

an ultimate solution for your problem would be a map state with ordered keys (like a TreeMap), but unfortunately, this is still a WIP feature. 

For now, maybe you could "eagerly remove” the outdated value (with `iterator.remove()`) when iterating the map state in the process function or split the key space for your map state into static bins, thus you could calculate a set of outdated keys before removing them.

Hope that helps.

Best,
Xingcan

On 5 Mar 2018, at 4:19 PM, Alberto Mancini <[hidden email]> wrote: