Re: Process Function

Posted by Navneeth Krishnan on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Process-Function-tp15376p15395.html

Thanks a lot everyone. I have the user data ingested from kafka and it is keyed by userid. There are around 80 parallel flatmap operator instances after keyby and there are around few million users. The map state includes userid as the key and some value. I guess I will try the approach that Aljoscha has mentioned and see how it works. 

On Tue, Sep 5, 2017 at 8:17 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

This is mostly correct, but you cannot register a timer in open() because we don't have an active key there. Only in process() and onTimer() can you register a timer.

In your case, I would suggest to somehow clamp the timestamp to the nearest 2 minute (or whatever) interval or to keep an extra ValueState that tells you whether you already registered a timer.

Best,
Aljoscha

On 5. Sep 2017, at 16:55, Kien Truong <[hidden email]> wrote:

Hi,

You can register a processing time timer inside the onTimer and the open function to have a timer that run periodically.

Pseudo-code example:

ValueState<Long> lastRuntime;

void open() {
  ctx.timerService().registerProcessingTimeTimer(current.timestamp + 60000);
}

void onTimer() {
  // Run the periodic task
  if (lastRuntime.get() + 60000 == timeStamp) {
    periodicTask();
  }
  // Re-register the processing time timer timer
  lastRuntime.setValue(timeStamp);
  ctx.timerService().registerProcessingTimeTimer(current.timestamp + 60000);
}

void periodicTask()

For the second question, timer are already scoped by key, so you can keep a lastModified variable as a ValueState,
then compare it to the timestamp provided by the timer to see if the current key should be evicted.
Checkout the example on the ProcessFunction page.


Best regards,
Kien

On 9/5/2017 11:49 AM, Navneeth Krishnan wrote:
Hi All,

I have a streaming pipeline which is keyed by userid and then to a flatmap function. I need to clear the state after sometime and I was looking at process function for it.

Inside the process element function if I register a timer wouldn't it create a timer for each incoming message?
// schedule the next timer 60 seconds from the current event time
        ctx.timerService().registerEventTimeTimer(current.timestamp + 60000);
How can I get something like a clean up task that runs every 2 mins and evicts all stale data? Also is there a way to get the key inside onTimer function so that I know which key has to be evicted?

Thanks,
Navneeth