Hi All,
I have a streaming pipeline which is keyed by userid and then to a flatmap function. I need to clear the state after sometime and I was looking at process function for it. Inside the process element function if I register a timer wouldn't it create a timer for each incoming message?
How can I get something like a clean up task that runs every 2 mins and evicts all stale data? Also is there a way to get the key inside onTimer function so that I know which key has to be evicted? Thanks, Navneeth |
How are you determining your data is stale? Also if you want to know the key,
why don't you store the key in your state as well? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Navneeth, Currently, I don't think there is any built-in functionality to trigger onTimer periodically. As for the second part of your question, do you mean that you want to query on which key the fired timer was registered from? I think this also isn't possible right now. I'm looping in Aljoscha in CC in case he has more insight on this. Cheers, Gordon On Tue, Sep 5, 2017 at 4:55 PM, Biplob Biswas <[hidden email]> wrote: How are you determining your data is stale? Also if you want to know the key, |
In reply to this post by Navneeth Krishnan
Hi, You can register a processing time timer inside the onTimer and
the open function to have a timer that run periodically. Pseudo-code example:
For the second question, timer are
already scoped by key, so you can keep a lastModified variable as
a ValueState,
then compare it to the timestamp
provided by the timer to see if the current key should be evicted.
Checkout the example on the
ProcessFunction page.
Best regards,
Kien
On 9/5/2017 11:49 AM, Navneeth Krishnan
wrote:
|
Hi,
This is mostly correct, but you cannot register a timer in open() because we don't have an active key there. Only in process() and onTimer() can you register a timer. In your case, I would suggest to somehow clamp the timestamp to the nearest 2 minute (or whatever) interval or to keep an extra ValueState that tells you whether you already registered a timer. Best, Aljoscha
|
Thanks a lot everyone. I have the user data ingested from kafka and it is keyed by userid. There are around 80 parallel flatmap operator instances after keyby and there are around few million users. The map state includes userid as the key and some value. I guess I will try the approach that Aljoscha has mentioned and see how it works. On Tue, Sep 5, 2017 at 8:17 AM, Aljoscha Krettek <[hidden email]> wrote:
|
In reply to this post by Aljoscha Krettek
Hi,
I'm actually not very familiar with the current Table API implementations but Fabian or Timo (cc'ed) should know more. I suspect very much that this is implemented like this, yes. Best, Aljoscha
|
Hi Johannes,
you can find the implementation for the state clean up here: https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala and a example usage here: https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala Regards, Timo Am 06.09.17 um 10:50 schrieb Aljoscha Krettek: Hi,
|
Thanks, that helped to see how we could implement this! On Wed, Sep 6, 2017 at 12:01 PM, Timo Walther <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |