Window processing in Stateful Functions

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Window processing in Stateful Functions

Annemarie Burger
Hi,

I want to do windowed processing in each function when using Stateful
Functions. Is this possible? Some pseudo code would be very helpful!

Some more context: I'm having a stream of edges as input. I want to window
these and save the graph representation (either as edge list, adjacency
list, or CSR) in a distributed way using state. Since doing this for the
entire edge stream would cost far too much memory, I want to only save the
state of the graph within the window. How could I achieve this?

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Window processing in Stateful Functions

Oytun Tez
I think this is also related to my question about CEP in Statefun.

[hidden email] , I was looking into using Siddhi library within the Function's context.



 --

MotaWord
Oytun Tez
M O T A W O R D CTO & Co-Founder
[hidden email]
     


On Wed, May 6, 2020 at 4:13 PM Annemarie Burger <[hidden email]> wrote:
Hi,

I want to do windowed processing in each function when using Stateful
Functions. Is this possible? Some pseudo code would be very helpful!

Some more context: I'm having a stream of edges as input. I want to window
these and save the graph representation (either as edge list, adjacency
list, or CSR) in a distributed way using state. Since doing this for the
entire edge stream would cost far too much memory, I want to only save the
state of the graph within the window. How could I achieve this?

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Window processing in Stateful Functions

m@xi
This post was updated on .
Hello Tez,

With all the respect, I doubt your answer is related the question.

Just to re-phase a bit: Assuming we use SF for our application, how can we
apply window logic when a function does some processing? Is there a proper
way?


@Igal: we would very much appreciate your answer. :)

Best,
Max





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Window processing in Stateful Functions

Oytun Tez
Oops – will follow the thread 🙊


 --

MotaWord
Oytun Tez
M O T A W O R D CTO & Co-Founder
[hidden email]
     


On Wed, May 6, 2020 at 5:37 PM m@xi <[hidden email]> wrote:
Hello Tez,

With all the respect, I doubt your answer is related the question.

*Just to re-phase a bit*: Assuming we use SF for our application, how can we
apply window logic when a function does some processing? *Is there a proper
way?*

@*Igal*: we would very much appreciate your answer. :)

Best,
Max





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Window processing in Stateful Functions

Igal Shilman
Hi all,

Data stream windows are not yet supported in statefun, but it seems like the main motivation here
is to purge old edges?
If this is the case perhaps we need to integrate state TTL [1] into persisted values/persistedtables.

An alternative approach would be to implement a thumbling window per vertex (a stateful function instance)
by sending to itself a delayed message [2]. When that specific delayed message arrives you would
have to purge the oldest edges by examining the edges in state.

I hope it helps,
Igal.




On Wednesday, May 6, 2020, Oytun Tez <[hidden email]> wrote:
Oops – will follow the thread 🙊


 --

MotaWord
Oytun Tez
M O T A W O R D CTO & Co-Founder
[hidden email]
     


On Wed, May 6, 2020 at 5:37 PM m@xi <[hidden email]> wrote:
Hello Tez,

With all the respect, I doubt your answer is related the question.

*Just to re-phase a bit*: Assuming we use SF for our application, how can we
apply window logic when a function does some processing? *Is there a proper
way?*

@*Igal*: we would very much appreciate your answer. :)

Best,
Max





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Window processing in Stateful Functions

m@xi
Dear Igal, Very insightful answer. Thanks.
Igal Shilman wrote
An alternative approach would be to implement a thumbling window per vertex (a stateful function instance) by sending to itself a delayed message [2]. When that specific delayed message arrives you would have to purge the oldest edges by examining the edges in state.
Indeed, the delayed asynchronous messages are a workaround for tumbling window simulation in SF. I believe you assume a message received by a stateful function contains multiple edges, i.e. which can all be delayed by a certain amount of time. Therefore, when a function receives a message, it purges all of its existing edges and incorporates the new (delayed) ones. Correct? Nevertheless, if you think of it, the delay is essentially the window slide. Now, what about the window range?
Igal Shilman wrote
Data stream windows are not yet supported in statefun, but it seems like the main motivation here is to purge old edges? If this is the case perhaps we need to integrate state TTL [1] into persisted values/persistedtables.
I was not aware about the TTL, very interesting and handful. Essentially, the TTL can enforce the window range i.e., attach to each tuple received by a stateful function its lifespan/duration. So, the first TTL attribute sets the range StateTtlConfig.newBuilder(Time.seconds(window range)). Therefore, by combining TTL and SF Delayed Messaging we can simulate sliding window processing on a stateful function basis. However, TTL is a Flink constuct and I am not sure if I got it correctly. You said
Igal Shilman wrote
If this is the case perhaps we need to integrate state TTL [1] into persisted values/persistedtables.
If this is the case, then I believe it would be great to integrate TLL into Persisted Values/Tables.

Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Window processing in Stateful Functions

Igal Shilman
Hi,
One way to keep the state size under control would be:
1) attach for every incoming edge it's "insertion time" into the vertex function's state.
2) in addition, the vertex function would send a delayed message, with a delay of insertion time + expiration duration
3) once a delayed message arrives, iterate over your edge state and remove all the edges with "insertion time" <= now()

To reduce the number of delayed messages, you can make sure to send a single delayed message once a fixed expiration interval
(a.k.a. tumbling window).

A better way to deal with that would be to wait until [1] would be implemented in StateFun (I don't believe it should take more than couple of weeks)
Then you can simply define your state with expiration and StateFun would make sure that the edge state would be purged automatically some configured time
after insertion. 

I hope this helps,
Good luck!
Igal.



On Fri, May 8, 2020 at 1:00 PM m@xi <[hidden email]> wrote:
Dear Igal, Very insightful answer. Thanks.
Igal Shilman wrote
An alternative approach would be to implement a thumbling window per vertex (a stateful function instance) by sending to itself a delayed message [2]. When that specific delayed message arrives you would have to purge the oldest edges by examining the edges in state.
Indeed, the delayed asynchronous messages are a workaround for tumbling window simulation in SF. I believe you assume a message received by a stateful function contains multiple edges, i.e. which can all be delayed by a certain amount of time. Therefore, when a function receives a message, it purges all of its existing edges and incorporates the new (delayed) ones. Correct? Nevertheless, if you think of it, the delay is essentially the window slide. Now, what about the window range?
Igal Shilman wrote
Data stream windows are not yet supported in statefun, but it seems like the main motivation here is to purge old edges? If this is the case perhaps we need to integrate state TTL [1] into persisted values/persistedtables.
I was not aware about the TTL, very interesting and handful. Essentially, the TTL can enforce the window range i.e., attach to each tuple received by a stateful function its lifespan/duration. So, the first TTL attribute sets the range StateTtlConfig.newBuilder(Time.seconds(window range)). Therefore, by combining TTL and SF Delayed Messaging we can simulate sliding window processing on a stateful function basis. However, TTL is a Flink constuct and I am not sure if I got it correctly. You said
Igal Shilman wrote
If this is the case perhaps we need to integrate state TTL [1] into persisted values/persistedtables.
If this is the case, then I believe it would be great to integrate TLL into Persisted Values/Tables.

Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.