Hi,This is mostly correct, but you cannot register a timer in open() because we don't have an active key there. Only in process() and onTimer() can you register a timer.In your case, I would suggest to somehow clamp the timestamp to the nearest 2 minute (or whatever) interval or to keep an extra ValueState that tells you whether you already registered a timer.Best,AljoschaOn 5. Sep 2017, at 16:55, Kien Truong <[hidden email]> wrote:Hi,
You can register a processing time timer inside the onTimer and the open function to have a timer that run periodically.
Pseudo-code example:
ValueState<Long> lastRuntime; void open() { ctx.timerService().
registerProcessingTimeTimer (current .timestamp + 60000); } void onTimer() { // Run the periodic task if (lastRuntime.get() + 60000 == timeStamp) { periodicTask(); } // Re-register the processing time timer timer lastRuntime.setValue(timeStamp);
ctx.timerService().registerPro
} void periodicTask()cessingTimeTimer (current.timestamp + 60000);
For the second question, timer are already scoped by key, so you can keep a lastModified variable as a ValueState,
then compare it to the timestamp provided by the timer to see if the current key should be evicted.
Checkout the example on the ProcessFunction page.
Best regards,Kien
On 9/5/2017 11:49 AM, Navneeth Krishnan wrote:
Hi All,
I have a streaming pipeline which is keyed by userid and then to a flatmap function. I need to clear the state after sometime and I was looking at process function for it.
Inside the process element function if I register a timer wouldn't it create a timer for each incoming message?// schedule the next timer 60 seconds from the current event time ctx.timerService().registerEve
ntTimeTimer (current.timestamp + 60000);How can I get something like a clean up task that runs every 2 mins and evicts all stale data? Also is there a way to get the key inside onTimer function so that I know which key has to be evicted?
Thanks,Navneeth
Free forum by Nabble | Edit this page |