Re: Process Function

Posted by Aljoscha Krettek on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Process-Function-tp15376p15404.html

Hi,

I'm actually not very familiar with the current Table API implementations but Fabian or Timo (cc'ed) should know more. I suspect very much that this is implemented like this, yes.

Best,
Aljoscha

On 5. Sep 2017, at 21:14, Johannes Schulte <[hidden email]> wrote:

Hi,

one short question I had that fits here. When using higher level streaming we can set min and max retention time [1] which is probably used to reduce the number of timers registered under the hood. How is this implemented, by registering a "clamped" timer?

Thanks,

Johannes


On Tue, Sep 5, 2017 at 5:17 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

This is mostly correct, but you cannot register a timer in open() because we don't have an active key there. Only in process() and onTimer() can you register a timer.

In your case, I would suggest to somehow clamp the timestamp to the nearest 2 minute (or whatever) interval or to keep an extra ValueState that tells you whether you already registered a timer.

Best,
Aljoscha

On 5. Sep 2017, at 16:55, Kien Truong <[hidden email]> wrote:

Hi,

You can register a processing time timer inside the onTimer and the open function to have a timer that run periodically.

Pseudo-code example:

ValueState<Long> lastRuntime;

void open() {
  ctx.timerService().registerProcessingTimeTimer(current.timestamp + 60000);
}

void onTimer() {
  // Run the periodic task
  if (lastRuntime.get() + 60000 == timeStamp) {
    periodicTask();
  }
  // Re-register the processing time timer timer
  lastRuntime.setValue(timeStamp);
  ctx.timerService().registerProcessingTimeTimer(current.timestamp + 60000);
}

void periodicTask()

For the second question, timer are already scoped by key, so you can keep a lastModified variable as a ValueState,
then compare it to the timestamp provided by the timer to see if the current key should be evicted.
Checkout the example on the ProcessFunction page.


Best regards,
Kien

On 9/5/2017 11:49 AM, Navneeth Krishnan wrote:
Hi All,

I have a streaming pipeline which is keyed by userid and then to a flatmap function. I need to clear the state after sometime and I was looking at process function for it.

Inside the process element function if I register a timer wouldn't it create a timer for each incoming message?
// schedule the next timer 60 seconds from the current event time
        ctx.timerService().registerEventTimeTimer(current.timestamp + 60000);
How can I get something like a clean up task that runs every 2 mins and evicts all stale data? Also is there a way to get the key inside onTimer function so that I know which key has to be evicted?

Thanks,
Navneeth