Complex graph-based sessionization (potential use for stateful functions)

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Complex graph-based sessionization (potential use for stateful functions)

Krzysztof Zarzycki
Hi!  Interesting problem to solve ahead :) 
I need to implement a streaming sessionization algorithm (split stream of events into groups of correlated events). It's pretty non-standard as we DON'T have a key like user id which separates the stream into substreams which we just need to chunk based on time. 
Instead and simplifying a lot, our events bear tuples, that I compare to graph edges, e.g.:
event 1: A -> B
event 2: B -> C
event 3: D -> E
event 4: D -> F
event 5: G -> F
I need to group them into subgroups reachable by following these edges from some specific nodes. E.g. here:
{ A->B, B->C} 
{ D->E, D->F}
{ G->F }
(note: I need to group the events, which are represented by edges here, not the nodes).
As far as I understand, to solve this problem I need to leverage feedback loops/iterations feature in Flink (Generally I believe I need to apply a Bulk Synchronous Processing approach).

Does anyone have seen this kind of sessionization implemented in the wild?
Would you suggest implementing such an algorithm using stateful functions? (AFAIK, they use feedback loops underneath). Can you suggest how would these be used here?
I know there are some problems with checkpointing when using iterations, does it mean the implementation may experience data loss on stops? 

Side comment: I'm not sure which graph algorithm derivative needs to be applied here, but the candidate is transitive closure. 

Thanks for joining the discussion!
Krzysztof

Reply | Threaded
Open this post in threaded view
|

Fwd: Complex graph-based sessionization (potential use for stateful functions)

rmetzger0
Forwarding Seth's answer to the list

---------- Forwarded message ---------
From: Seth Wiesman <[hidden email]>
Date: Tue, Mar 31, 2020 at 4:47 PM
Subject: Re: Complex graph-based sessionization (potential use for stateful functions)
To: Krzysztof Zarzycki <[hidden email]>
Cc: user <[hidden email]>, <[hidden email]>


Hi Krzysztof,

This is a great use case for Stateful Functions. I have actually been considering adding a graph algorithm example to the statefun repo for some time now.

StateFun does use iteration under the hood and provides exactly-once guarantees. In-flight records will never be lost in the case of failure. From a user code perspective, the api offers arbitrary message passing between different functions (stateful virtual actors).

For a rough sketch of what this would look like; you could create a function called `Vertex` that represents a single vertice on the graph. Its state would the edges, all vertices reachable from that point, We now have a distributed, fault-tolerant, adjacency list. You can implement whatever graph algorithm you like on top of this structure. Walking the graph would just be starting from a point, and messaging the vertices stored in state.

Just in case you are not aware, the community is currently in the process of releasing the first Apache release of StateFun and it should hopefully be out by the end of this week. Just to say the API is stable and you can start developing on top of it.


On Mon, Mar 30, 2020 at 6:00 PM Krzysztof Zarzycki <[hidden email]> wrote:
Hi!  Interesting problem to solve ahead :) 
I need to implement a streaming sessionization algorithm (split stream of events into groups of correlated events). It's pretty non-standard as we DON'T have a key like user id which separates the stream into substreams which we just need to chunk based on time. 
Instead and simplifying a lot, our events bear tuples, that I compare to graph edges, e.g.:
event 1: A -> B
event 2: B -> C
event 3: D -> E
event 4: D -> F
event 5: G -> F
I need to group them into subgroups reachable by following these edges from some specific nodes. E.g. here:
{ A->B, B->C} 
{ D->E, D->F}
{ G->F }
(note: I need to group the events, which are represented by edges here, not the nodes).
As far as I understand, to solve this problem I need to leverage feedback loops/iterations feature in Flink (Generally I believe I need to apply a Bulk Synchronous Processing approach).

Does anyone have seen this kind of sessionization implemented in the wild?
Would you suggest implementing such an algorithm using stateful functions? (AFAIK, they use feedback loops underneath). Can you suggest how would these be used here?
I know there are some problems with checkpointing when using iterations, does it mean the implementation may experience data loss on stops? 

Side comment: I'm not sure which graph algorithm derivative needs to be applied here, but the candidate is transitive closure. 

Thanks for joining the discussion!
Krzysztof



--

Seth Wiesman | Solutions Architect

+1 314 387 1463



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

Reply | Threaded
Open this post in threaded view
|

Re: Fwd: Complex graph-based sessionization (potential use for stateful functions)

m@xi
Hello Robert

Thanks to your reply I discovered the Stateful Functions which I believe is
a quite powerful tool. I have some questions:

1) As you said, "the community is currently in the process of releasing the
first Apache release of StateFun and it should hopefully be out by the end
of this week". Does this mean that it will become available in Maven
Repository?

Because I can't find it while searching in
https://mvnrepository.com/artifact/org.apache.flink?sort=newest 
or
use the API in my intellij project when I import the dependencies in my POM
file.

I though of dowloading the code from
https://ci.apache.org/projects/flink/flink-statefun-docs-master/, compiling
it with *mvn clean package* and then import the produced jar file to my
intellij project as an External Library. Is this what you might recommend
for now?

2) I came across this tutorial by Stefan on stateful functions
https://www.youtube.com/watch?v=fCeHCMJXXM0 where he mentions that arbitrary
communication between nodes/functions/actors is essentially made possible by
introducing feedback loops to the DAG Flink topology (well now it has
circles I guess :P) to simulate the arbitrary messasing defined in the
StateFun topology.

So the message passing is done by "feedback and tuple rerouting" and not
with MPI. Do you think (or have tested) if one may *efficiently*
send/receive (potentially large) state, like graph state which is the use
case of this post?  Or it is more suitable for sending control messages
between actors/functions?

Thanks a lot in advance.

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Fwd: Complex graph-based sessionization (potential use for stateful functions)

rmetzger0
Hey Max,


Please give us honest feedback on the "onboarding experience" with stateful functions. We are very eager to make the experience as smooth as possible :) 

2) What do you consider large state? The Flink runtime itself can handle large events / messages (1GB should work). I'm not sure about statefun, but I will try to get an answer for you :) 

Best,
Robert


On Tue, Apr 7, 2020 at 9:31 AM m@xi <[hidden email]> wrote:
Hello Robert

Thanks to your reply I discovered the Stateful Functions which I believe is
a quite powerful tool. I have some questions:

1) As you said, "the community is currently in the process of releasing the
first Apache release of StateFun and it should hopefully be out by the end
of this week". Does this mean that it will become available in Maven
Repository?

Because I can't find it while searching in
https://mvnrepository.com/artifact/org.apache.flink?sort=newest
or
use the API in my intellij project when I import the dependencies in my POM
file.

I though of dowloading the code from
https://ci.apache.org/projects/flink/flink-statefun-docs-master/, compiling
it with *mvn clean package* and then import the produced jar file to my
intellij project as an External Library. Is this what you might recommend
for now?

2) I came across this tutorial by Stefan on stateful functions
https://www.youtube.com/watch?v=fCeHCMJXXM0 where he mentions that arbitrary
communication between nodes/functions/actors is essentially made possible by
introducing feedback loops to the DAG Flink topology (well now it has
circles I guess :P) to simulate the arbitrary messasing defined in the
StateFun topology.

So the message passing is done by "feedback and tuple rerouting" and not
with MPI. Do you think (or have tested) if one may *efficiently*
send/receive (potentially large) state, like graph state which is the use
case of this post?  Or it is more suitable for sending control messages
between actors/functions?

Thanks a lot in advance.

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Complex graph-based sessionization (potential use for stateful functions)

Igal Shilman
Hi All,
One way to try to think about it with StateFun, is to represent the Graph vertices as stateful functions instances. Unlike other frameworks an instance of a function does not take any resources while idle, and potentially you can have many millions of those.
A state for each vertex might be a list of adjacent vertices, and potentially a timer so that they won’t linger for to long.
You would still have to think of what kind of graph algorithm to apply here.

I hope it helps,
Igal.

On Thursday, April 9, 2020, Robert Metzger <[hidden email]> wrote:
Hey Max,


Please give us honest feedback on the "onboarding experience" with stateful functions. We are very eager to make the experience as smooth as possible :) 

2) What do you consider large state? The Flink runtime itself can handle large events / messages (1GB should work). I'm not sure about statefun, but I will try to get an answer for you :) 

Best,
Robert


On Tue, Apr 7, 2020 at 9:31 AM m@xi <[hidden email]> wrote:
Hello Robert

Thanks to your reply I discovered the Stateful Functions which I believe is
a quite powerful tool. I have some questions:

1) As you said, "the community is currently in the process of releasing the
first Apache release of StateFun and it should hopefully be out by the end
of this week". Does this mean that it will become available in Maven
Repository?

Because I can't find it while searching in
https://mvnrepository.com/artifact/org.apache.flink?sort=newest
or
use the API in my intellij project when I import the dependencies in my POM
file.

I though of dowloading the code from
https://ci.apache.org/projects/flink/flink-statefun-docs-master/, compiling
it with *mvn clean package* and then import the produced jar file to my
intellij project as an External Library. Is this what you might recommend
for now?

2) I came across this tutorial by Stefan on stateful functions
https://www.youtube.com/watch?v=fCeHCMJXXM0 where he mentions that arbitrary
communication between nodes/functions/actors is essentially made possible by
introducing feedback loops to the DAG Flink topology (well now it has
circles I guess :P) to simulate the arbitrary messasing defined in the
StateFun topology.

So the message passing is done by "feedback and tuple rerouting" and not
with MPI. Do you think (or have tested) if one may *efficiently*
send/receive (potentially large) state, like graph state which is the use
case of this post?  Or it is more suitable for sending control messages
between actors/functions?

Thanks a lot in advance.

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Fwd: Complex graph-based sessionization (potential use for stateful functions)

m@xi
In reply to this post by rmetzger0
Hi Robert,

Thanks a lot for your reply.

1) Now statefun packages are in the MVN repository, so probably they needed
some time to really be included there after your official release.

2) Alinged to the topic of the thread, I am referring to state of massive
graph streams.
    To enable distributed graph processing first one needs to partition (in
a specific or random way) the incoming edges to
    different processing units which subsequently should:
        (a) maintain (initialize and update) the part of the graph's state
they are responsible for as the graph stream evolves
        (b) perform computation for answering the distributed graph
processing task.

The (a) part can be done easily with Flink, but (b) probably dictates
accessing partial graph state of one machine from another.

Based on my experience, to do it in Flink one has to implement the
communication among processing units in such a way that is specific to the
graph processing task. So, it would be great to decouple (a) from (b) i.e.,
if Stateful Functions can allow accessing/sending graph state using their
primitives for messaging.

Therefore, my question are the following:

i)   is it possible to have Flink jobs that invoke the Stateful Functions
just for state accessing/retrieving/migration??
ii)  if yes, how efficient would it be?
iii) if not, can you sketch *proper* way(s) of  for state
accessing/retrieving/migration in plain Flink??

Thanks in advance.

Best,
Makis



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Complex graph-based sessionization (potential use for stateful functions)

m@xi
In reply to this post by Igal Shilman
Hi Igal,

Thanks a lot for your answer. I believe you are one of the core developers
behind the interesting statefun.

Your suggestion is really nice and as you say, one way is to tailor the
graph processing to the philosophy of SF.

Though, if one vertex is a stateful function, then heavy hitter nodes in
massive graph will become a bottleneck since their functions will do way
more computation than other nodes, which have only a few edges.

Nevertheless, can the statefun messages be everything? For instance, "its
adjacency list" parts of the a vertex/function could be sent as well??

Another way is SF and Flink to way together. Can SF be responsible solely
for the messaging of state in a Flink job??

Thanks you in advance.

Best,
Makis





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/