Process Function

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Process Function

Navneeth Krishnan
Hi All,

I have a streaming pipeline which is keyed by userid and then to a flatmap function. I need to clear the state after sometime and I was looking at process function for it.

Inside the process element function if I register a timer wouldn't it create a timer for each incoming message?
// schedule the next timer 60 seconds from the current event time
        ctx.timerService().registerEventTimeTimer(current.timestamp + 60000);
How can I get something like a clean up task that runs every 2 mins and evicts all stale data? Also is there a way to get the key inside onTimer function so that I know which key has to be evicted?

Thanks,
Navneeth
Reply | Threaded
Open this post in threaded view
|

Re: Process Function

Biplob Biswas
How are you determining your data is stale? Also if you want to know the key,
why don't you store the key in your state as well?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Process Function

Tzu-Li (Gordon) Tai
Hi Navneeth,

Currently, I don't think there is any built-in functionality to trigger onTimer periodically.
As for the second part of your question, do you mean that you want to query on which key the fired timer was registered from? I think this also isn't possible right now.

I'm looping in Aljoscha in CC in case he has more insight on this.

Cheers,
Gordon


On Tue, Sep 5, 2017 at 4:55 PM, Biplob Biswas <[hidden email]> wrote:
How are you determining your data is stale? Also if you want to know the key,
why don't you store the key in your state as well?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Process Function

Kien Truong
In reply to this post by Navneeth Krishnan

Hi,

You can register a processing time timer inside the onTimer and the open function to have a timer that run periodically.

Pseudo-code example:

ValueState<Long> lastRuntime;

void open() {
  ctx.timerService().registerProcessingTimeTimer(current.timestamp + 60000);
}

void onTimer() {
  // Run the periodic task
  if (lastRuntime.get() + 60000 == timeStamp) {
    periodicTask();
  }
  // Re-register the processing time timer timer
  lastRuntime.setValue(timeStamp);
  ctx.timerService().registerProcessingTimeTimer(current.timestamp + 60000);
}

void periodicTask()

For the second question, timer are already scoped by key, so you can keep a lastModified variable as a ValueState,
then compare it to the timestamp provided by the timer to see if the current key should be evicted.
Checkout the example on the ProcessFunction page.


Best regards,
Kien

On 9/5/2017 11:49 AM, Navneeth Krishnan wrote:
Hi All,

I have a streaming pipeline which is keyed by userid and then to a flatmap function. I need to clear the state after sometime and I was looking at process function for it.

Inside the process element function if I register a timer wouldn't it create a timer for each incoming message?
// schedule the next timer 60 seconds from the current event time
        ctx.timerService().registerEventTimeTimer(current.timestamp + 60000);
How can I get something like a clean up task that runs every 2 mins and evicts all stale data? Also is there a way to get the key inside onTimer function so that I know which key has to be evicted?

Thanks,
Navneeth
Reply | Threaded
Open this post in threaded view
|

Re: Process Function

Aljoscha Krettek
Hi,

This is mostly correct, but you cannot register a timer in open() because we don't have an active key there. Only in process() and onTimer() can you register a timer.

In your case, I would suggest to somehow clamp the timestamp to the nearest 2 minute (or whatever) interval or to keep an extra ValueState that tells you whether you already registered a timer.

Best,
Aljoscha

On 5. Sep 2017, at 16:55, Kien Truong <[hidden email]> wrote:

Hi,

You can register a processing time timer inside the onTimer and the open function to have a timer that run periodically.

Pseudo-code example:

ValueState<Long> lastRuntime;

void open() {
  ctx.timerService().registerProcessingTimeTimer(current.timestamp + 60000);
}

void onTimer() {
  // Run the periodic task
  if (lastRuntime.get() + 60000 == timeStamp) {
    periodicTask();
  }
  // Re-register the processing time timer timer
  lastRuntime.setValue(timeStamp);
  ctx.timerService().registerProcessingTimeTimer(current.timestamp + 60000);
}

void periodicTask()

For the second question, timer are already scoped by key, so you can keep a lastModified variable as a ValueState,
then compare it to the timestamp provided by the timer to see if the current key should be evicted.
Checkout the example on the ProcessFunction page.


Best regards,
Kien

On 9/5/2017 11:49 AM, Navneeth Krishnan wrote:
Hi All,

I have a streaming pipeline which is keyed by userid and then to a flatmap function. I need to clear the state after sometime and I was looking at process function for it.

Inside the process element function if I register a timer wouldn't it create a timer for each incoming message?
// schedule the next timer 60 seconds from the current event time
        ctx.timerService().registerEventTimeTimer(current.timestamp + 60000);
How can I get something like a clean up task that runs every 2 mins and evicts all stale data? Also is there a way to get the key inside onTimer function so that I know which key has to be evicted?

Thanks,
Navneeth

Reply | Threaded
Open this post in threaded view
|

Re: Process Function

Navneeth Krishnan
Thanks a lot everyone. I have the user data ingested from kafka and it is keyed by userid. There are around 80 parallel flatmap operator instances after keyby and there are around few million users. The map state includes userid as the key and some value. I guess I will try the approach that Aljoscha has mentioned and see how it works. 

On Tue, Sep 5, 2017 at 8:17 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

This is mostly correct, but you cannot register a timer in open() because we don't have an active key there. Only in process() and onTimer() can you register a timer.

In your case, I would suggest to somehow clamp the timestamp to the nearest 2 minute (or whatever) interval or to keep an extra ValueState that tells you whether you already registered a timer.

Best,
Aljoscha

On 5. Sep 2017, at 16:55, Kien Truong <[hidden email]> wrote:

Hi,

You can register a processing time timer inside the onTimer and the open function to have a timer that run periodically.

Pseudo-code example:

ValueState<Long> lastRuntime;

void open() {
  ctx.timerService().registerProcessingTimeTimer(current.timestamp + 60000);
}

void onTimer() {
  // Run the periodic task
  if (lastRuntime.get() + 60000 == timeStamp) {
    periodicTask();
  }
  // Re-register the processing time timer timer
  lastRuntime.setValue(timeStamp);
  ctx.timerService().registerProcessingTimeTimer(current.timestamp + 60000);
}

void periodicTask()

For the second question, timer are already scoped by key, so you can keep a lastModified variable as a ValueState,
then compare it to the timestamp provided by the timer to see if the current key should be evicted.
Checkout the example on the ProcessFunction page.


Best regards,
Kien

On 9/5/2017 11:49 AM, Navneeth Krishnan wrote:
Hi All,

I have a streaming pipeline which is keyed by userid and then to a flatmap function. I need to clear the state after sometime and I was looking at process function for it.

Inside the process element function if I register a timer wouldn't it create a timer for each incoming message?
// schedule the next timer 60 seconds from the current event time
        ctx.timerService().registerEventTimeTimer(current.timestamp + 60000);
How can I get something like a clean up task that runs every 2 mins and evicts all stale data? Also is there a way to get the key inside onTimer function so that I know which key has to be evicted?

Thanks,
Navneeth


Reply | Threaded
Open this post in threaded view
|

Re: Process Function

Aljoscha Krettek
In reply to this post by Aljoscha Krettek
Hi,

I'm actually not very familiar with the current Table API implementations but Fabian or Timo (cc'ed) should know more. I suspect very much that this is implemented like this, yes.

Best,
Aljoscha

On 5. Sep 2017, at 21:14, Johannes Schulte <[hidden email]> wrote:

Hi,

one short question I had that fits here. When using higher level streaming we can set min and max retention time [1] which is probably used to reduce the number of timers registered under the hood. How is this implemented, by registering a "clamped" timer?

Thanks,

Johannes


On Tue, Sep 5, 2017 at 5:17 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

This is mostly correct, but you cannot register a timer in open() because we don't have an active key there. Only in process() and onTimer() can you register a timer.

In your case, I would suggest to somehow clamp the timestamp to the nearest 2 minute (or whatever) interval or to keep an extra ValueState that tells you whether you already registered a timer.

Best,
Aljoscha

On 5. Sep 2017, at 16:55, Kien Truong <[hidden email]> wrote:

Hi,

You can register a processing time timer inside the onTimer and the open function to have a timer that run periodically.

Pseudo-code example:

ValueState<Long> lastRuntime;

void open() {
  ctx.timerService().registerProcessingTimeTimer(current.timestamp + 60000);
}

void onTimer() {
  // Run the periodic task
  if (lastRuntime.get() + 60000 == timeStamp) {
    periodicTask();
  }
  // Re-register the processing time timer timer
  lastRuntime.setValue(timeStamp);
  ctx.timerService().registerProcessingTimeTimer(current.timestamp + 60000);
}

void periodicTask()

For the second question, timer are already scoped by key, so you can keep a lastModified variable as a ValueState,
then compare it to the timestamp provided by the timer to see if the current key should be evicted.
Checkout the example on the ProcessFunction page.


Best regards,
Kien

On 9/5/2017 11:49 AM, Navneeth Krishnan wrote:
Hi All,

I have a streaming pipeline which is keyed by userid and then to a flatmap function. I need to clear the state after sometime and I was looking at process function for it.

Inside the process element function if I register a timer wouldn't it create a timer for each incoming message?
// schedule the next timer 60 seconds from the current event time
        ctx.timerService().registerEventTimeTimer(current.timestamp + 60000);
How can I get something like a clean up task that runs every 2 mins and evicts all stale data? Also is there a way to get the key inside onTimer function so that I know which key has to be evicted?

Thanks,
Navneeth



Reply | Threaded
Open this post in threaded view
|

Re: Process Function

Timo Walther
Hi,

I'm actually not very familiar with the current Table API implementations but Fabian or Timo (cc'ed) should know more. I suspect very much that this is implemented like this, yes.

Best,
Aljoscha

On 5. Sep 2017, at 21:14, Johannes Schulte <[hidden email]> wrote:

Hi,

one short question I had that fits here. When using higher level streaming we can set min and max retention time [1] which is probably used to reduce the number of timers registered under the hood. How is this implemented, by registering a "clamped" timer?

Thanks,

Johannes


On Tue, Sep 5, 2017 at 5:17 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

This is mostly correct, but you cannot register a timer in open() because we don't have an active key there. Only in process() and onTimer() can you register a timer.

In your case, I would suggest to somehow clamp the timestamp to the nearest 2 minute (or whatever) interval or to keep an extra ValueState that tells you whether you already registered a timer.

Best,
Aljoscha

On 5. Sep 2017, at 16:55, Kien Truong <[hidden email]> wrote:

Hi,

You can register a processing time timer inside the onTimer and the open function to have a timer that run periodically.

Pseudo-code example:

ValueState<Long> lastRuntime;

void open() {
  ctx.timerService().registerProcessingTimeTimer(current.timestamp + 60000);
}

void onTimer() {
  // Run the periodic task
  if (lastRuntime.get() + 60000 == timeStamp) {
    periodicTask();
  }
  // Re-register the processing time timer timer
  lastRuntime.setValue(timeStamp);
  ctx.timerService().registerProcessingTimeTimer(current.timestamp + 60000);
}

void periodicTask()

For the second question, timer are already scoped by key, so you can keep a lastModified variable as a ValueState,
then compare it to the timestamp provided by the timer to see if the current key should be evicted.
Checkout the example on the ProcessFunction page.


Best regards,
Kien

On 9/5/2017 11:49 AM, Navneeth Krishnan wrote:
Hi All,

I have a streaming pipeline which is keyed by userid and then to a flatmap function. I need to clear the state after sometime and I was looking at process function for it.

Inside the process element function if I register a timer wouldn't it create a timer for each incoming message?
// schedule the next timer 60 seconds from the current event time
        ctx.timerService().registerEventTimeTimer(current.timestamp + 60000);
How can I get something like a clean up task that runs every 2 mins and evicts all stale data? Also is there a way to get the key inside onTimer function so that I know which key has to be evicted?

Thanks,
Navneeth




Reply | Threaded
Open this post in threaded view
|

Re: Process Function

Johannes Schulte
Thanks, that helped to see how we could implement this!

On Wed, Sep 6, 2017 at 12:01 PM, Timo Walther <[hidden email]> wrote:
Hi,

I'm actually not very familiar with the current Table API implementations but Fabian or Timo (cc'ed) should know more. I suspect very much that this is implemented like this, yes.

Best,
Aljoscha

On 5. Sep 2017, at 21:14, Johannes Schulte <[hidden email]> wrote:

Hi,

one short question I had that fits here. When using higher level streaming we can set min and max retention time [1] which is probably used to reduce the number of timers registered under the hood. How is this implemented, by registering a "clamped" timer?

Thanks,

Johannes


On Tue, Sep 5, 2017 at 5:17 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

This is mostly correct, but you cannot register a timer in open() because we don't have an active key there. Only in process() and onTimer() can you register a timer.

In your case, I would suggest to somehow clamp the timestamp to the nearest 2 minute (or whatever) interval or to keep an extra ValueState that tells you whether you already registered a timer.

Best,
Aljoscha

On 5. Sep 2017, at 16:55, Kien Truong <[hidden email]> wrote:

Hi,

You can register a processing time timer inside the onTimer and the open function to have a timer that run periodically.

Pseudo-code example:

ValueState<Long> lastRuntime;

void open() {
  ctx.timerService().registerProcessingTimeTimer(current.timestamp + 60000);
}

void onTimer() {
  // Run the periodic task
  if (lastRuntime.get() + 60000 == timeStamp) {
    periodicTask();
  }
  // Re-register the processing time timer timer
  lastRuntime.setValue(timeStamp);
  ctx.timerService().registerProcessingTimeTimer(current.timestamp + 60000);
}

void periodicTask()

For the second question, timer are already scoped by key, so you can keep a lastModified variable as a ValueState,
then compare it to the timestamp provided by the timer to see if the current key should be evicted.
Checkout the example on the ProcessFunction page.


Best regards,
Kien

On 9/5/2017 11:49 AM, Navneeth Krishnan wrote:
Hi All,

I have a streaming pipeline which is keyed by userid and then to a flatmap function. I need to clear the state after sometime and I was looking at process function for it.

Inside the process element function if I register a timer wouldn't it create a timer for each incoming message?
// schedule the next timer 60 seconds from the current event time
        ctx.timerService().registerEventTimeTimer(current.timestamp + 60000);
How can I get something like a clean up task that runs every 2 mins and evicts all stale data? Also is there a way to get the key inside onTimer function so that I know which key has to be evicted?

Thanks,
Navneeth