Hi,
I want to do windowed processing in each function when using Stateful Functions. Is this possible? Some pseudo code would be very helpful! Some more context: I'm having a stream of edges as input. I want to window these and save the graph representation (either as edge list, adjacency list, or CSR) in a distributed way using state. Since doing this for the entire edge stream would cost far too much memory, I want to only save the state of the graph within the window. How could I achieve this? Thanks! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
I think this is also related to my question about CEP in Statefun. On Wed, May 6, 2020 at 4:13 PM Annemarie Burger <[hidden email]> wrote: Hi, |
This post was updated on .
Hello Tez,
With all the respect, I doubt your answer is related the question. Just to re-phase a bit: Assuming we use SF for our application, how can we apply window logic when a function does some processing? Is there a proper way? @Igal: we would very much appreciate your answer. :) Best, Max -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Oops – will follow the thread 🙊 On Wed, May 6, 2020 at 5:37 PM m@xi <[hidden email]> wrote: Hello Tez, |
Hi all, Data stream windows are not yet supported in statefun, but it seems like the main motivation here is to purge old edges? If this is the case perhaps we need to integrate state TTL [1] into persisted values/persistedtables. An alternative approach would be to implement a thumbling window per vertex (a stateful function instance) by sending to itself a delayed message [2]. When that specific delayed message arrives you would have to purge the oldest edges by examining the edges in state. I hope it helps, Igal. On Wednesday, May 6, 2020, Oytun Tez <[hidden email]> wrote:
|
Dear Igal,
Very insightful answer. Thanks.
Indeed, the delayed asynchronous messages are a workaround for tumbling window simulation in SF. I believe you assume a message received by a stateful function contains multiple edges, i.e. which can all be delayed by a certain amount of time. Therefore, when a function receives a message, it purges all of its existing edges and incorporates the new (delayed) ones. Correct? Nevertheless, if you think of it, the delay is essentially the window slide. Now, what about the window range? I was not aware about the TTL, very interesting and handful. Essentially, the TTL can enforce the window range i.e., attach to each tuple received by a stateful function its lifespan/duration. So, the first TTL attribute sets the range StateTtlConfig.newBuilder(Time.seconds(window range)). Therefore, by combining TTL and SF Delayed Messaging we can simulate sliding window processing on a stateful function basis. However, TTL is a Flink constuct and I am not sure if I got it correctly. You said If this is the case, then I believe it would be great to integrate TLL into Persisted Values/Tables. Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Hi, One way to keep the state size under control would be: 1) attach for every incoming edge it's "insertion time" into the vertex function's state. 2) in addition, the vertex function would send a delayed message, with a delay of insertion time + expiration duration 3) once a delayed message arrives, iterate over your edge state and remove all the edges with "insertion time" <= now() To reduce the number of delayed messages, you can make sure to send a single delayed message once a fixed expiration interval (a.k.a. tumbling window). A better way to deal with that would be to wait until [1] would be implemented in StateFun (I don't believe it should take more than couple of weeks) Then you can simply define your state with expiration and StateFun would make sure that the edge state would be purged automatically some configured time after insertion. I hope this helps, Good luck! Igal. On Fri, May 8, 2020 at 1:00 PM m@xi <[hidden email]> wrote: Dear Igal, Very insightful answer. Thanks.Indeed, the delayed asynchronous messages are a workaround for tumbling window simulation in SF. I believe you assume a message received by a stateful function contains multiple edges, i.e. which can all be delayed by a certain amount of time. Therefore, when a function receives a message, it purges all of its existing edges and incorporates the new (delayed) ones. Correct? Nevertheless, if you think of it, the delay is essentially the window slide. Now, what about the window range? |
Free forum by Nabble | Edit this page |