Superstep-like synchronization of streaming iteration

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Superstep-like synchronization of streaming iteration

Christian Lehner
Hi all,


if you don't want to read the wall of text below, in short, I want to
know if it is possible to get a superstep-based iteration on a possibly
unbounded DataStream in Flink in an efficient way and what general
concept(s) of synchronization you would suggest for that.


I would like to write a program that has different vertices (realized
just as Longs for now) in a graph which all store a keyed state and
communicate with each other with messages that arrive in an iterated
stream.

 From the outside I would only get the messages to add (or, possibly in
the future, delete, however that can be ignored for now) a certain
vertex with some possible additional information specified by the
program (this message can be assumed to have the same form as any other
message) and then the rest would happen through an iterated stream keyed
by the vertex to which the message is adressed in which a vertex through
a KeyedProcessFunction (or KeyedBroadcastProcessFunction if a
BroadcastStream is used for synchronization) can send new messages to
any other vertex (or itself) based on the received message(s) and its
own current state and can also update its state based on the received
message(s). The new messages would then be fed back into the iterated
stream. If no synchronization is done this works quite well, however it
doesn't produce helpful results for my problem since no order in which
the messages arrive can be guaranteed.

What I would optimally like to have is a pregel-like superstep-based
iteration which runs on a batch of outside messages (here: vertex
additions) until no more messages are produced and after that repeats
that with the next batch of vertices either infinitely or until there
are no more new messages received. During the execution of each batch
all vertices (including older ones) can be activated again by receiving
a message and the state of each vertex should be preserved throughout
the execution of the program. The problem lies in how I can seperate the
messages into supersteps in an iterative partitioned stream similar to
the iterations in the DataSetAPI.

One idea I had was just making tumbling windows of a large enough amount
of time which would just collect all the messages and then emit them in
a ProcessWindowFunction once the window fires. While this would be quite
a simple solution that requires little non-parallel synchonization and
it would obviously require that we know such a time in which we can be
guaranteed that all messages have been processed and all new messages
for the next superstep produced which is realistically not the case. It
would also mean that in most supersteps the program would wait longer
than necessary until it starts the next superstep. Fault tolerance would
also be very hard to achieve.

Another more complex idea was to just globally synchronize with an
object that remembers which vertices have been sent messages in the
previous superstep by being informed before any message is sent and then
is also informed when a vertex is done with processing a message and
informs the vertex if there globally are no more messages to be
processed. If that is the case the vertex then sends a NextSuperstep
message which is broadcast to all partitions with a BroadcastStream.
After that all vertices can start with processing all messages sent to
them in the previous superstep. Other than not being trivially to
synchronize without any problems (which I'm working on myself) this
approach has the obvious disadvantage that a lot of information has to
be passed to this object in a globally synchronized manner which kind of
kills the point of parallel processing. Although it is obvious that some
global synchronization probably has to take place this approach seems
rather ineffective to me.

Since I haven't been working with flink for very long, although I have
intensively used it for the past couple of weeks and read all releveant
documentation I could find, I would like to ask if someone has a
suggestion how to implement such a superstep-based iteration in the
DataStreamAPI in the most efficient way with Flink and if you think this
is actually even a worthwhile endeavor. I would especially like to know
if Flink already provides classes, methods or concepts that would be
helpful for that.

Since our project isn't really close to a finished program yet and
consists mainly of various test programs, I cannot really show you a
complete code of what I already have, but if anyone has any specific
questions I probably can send you a pseudocode or a java code of one of
these test programs to describe what I imagine. Also, since we are still
relatively open on how exactly we want to solve our original problem,
I'm also open to suggestions which solve only  a similar problem, even
if they don't fully fit what I described above.

It's of course also possible that there is already a simple solution in
Flink which I somehow manged to overlook until now. In that case I'm
sorry for bothering you but I would still like to know what I should
look up.


Best, Christian



Reply | Threaded
Open this post in threaded view
|

Re: Superstep-like synchronization of streaming iteration

Paris Carbone
Hi Christian,

It is great to see use iterative use cases, thanks for sharing your problem!

Superstep iterative BSP synchronization for streams is a problem we have been looking into extensively, however this functionality is still not standardised yet on Flink.
I think your use case is fully covered by our proposed approach, described in a research talk at Flink Forward 18 in Berlin [1] (probably there is a video available too at the dataArtisans website).
Take a look and in case this approach satisfies your needs and you would like to test out your application with our current prototype please do PM me!

Paris

[1] https://www.slideshare.net/FlinkForward/flink-forward-berlin-2018-paris-carbone-stream-loops-on-flink-reinventing-the-wheel-for-the-streaming-era

> On 29 Sep 2018, at 20:51, Christian Lehner <[hidden email]> wrote:
>
> Hi all,
>
>
> if you don't want to read the wall of text below, in short, I want to know if it is possible to get a superstep-based iteration on a possibly unbounded DataStream in Flink in an efficient way and what general concept(s) of synchronization you would suggest for that.
>
>
> I would like to write a program that has different vertices (realized just as Longs for now) in a graph which all store a keyed state and communicate with each other with messages that arrive in an iterated stream.
>
> From the outside I would only get the messages to add (or, possibly in the future, delete, however that can be ignored for now) a certain vertex with some possible additional information specified by the program (this message can be assumed to have the same form as any other message) and then the rest would happen through an iterated stream keyed by the vertex to which the message is adressed in which a vertex through a KeyedProcessFunction (or KeyedBroadcastProcessFunction if a BroadcastStream is used for synchronization) can send new messages to any other vertex (or itself) based on the received message(s) and its own current state and can also update its state based on the received message(s). The new messages would then be fed back into the iterated stream. If no synchronization is done this works quite well, however it doesn't produce helpful results for my problem since no order in which the messages arrive can be guaranteed.
>
> What I would optimally like to have is a pregel-like superstep-based iteration which runs on a batch of outside messages (here: vertex additions) until no more messages are produced and after that repeats that with the next batch of vertices either infinitely or until there are no more new messages received. During the execution of each batch all vertices (including older ones) can be activated again by receiving a message and the state of each vertex should be preserved throughout the execution of the program. The problem lies in how I can seperate the messages into supersteps in an iterative partitioned stream similar to the iterations in the DataSetAPI.
>
> One idea I had was just making tumbling windows of a large enough amount of time which would just collect all the messages and then emit them in a ProcessWindowFunction once the window fires. While this would be quite a simple solution that requires little non-parallel synchonization and it would obviously require that we know such a time in which we can be guaranteed that all messages have been processed and all new messages for the next superstep produced which is realistically not the case. It would also mean that in most supersteps the program would wait longer than necessary until it starts the next superstep. Fault tolerance would also be very hard to achieve.
>
> Another more complex idea was to just globally synchronize with an object that remembers which vertices have been sent messages in the previous superstep by being informed before any message is sent and then is also informed when a vertex is done with processing a message and informs the vertex if there globally are no more messages to be processed. If that is the case the vertex then sends a NextSuperstep message which is broadcast to all partitions with a BroadcastStream. After that all vertices can start with processing all messages sent to them in the previous superstep. Other than not being trivially to synchronize without any problems (which I'm working on myself) this approach has the obvious disadvantage that a lot of information has to be passed to this object in a globally synchronized manner which kind of kills the point of parallel processing. Although it is obvious that some global synchronization probably has to take place this approach seems rather ineffective to me.
>
> Since I haven't been working with flink for very long, although I have intensively used it for the past couple of weeks and read all releveant documentation I could find, I would like to ask if someone has a suggestion how to implement such a superstep-based iteration in the DataStreamAPI in the most efficient way with Flink and if you think this is actually even a worthwhile endeavor. I would especially like to know if Flink already provides classes, methods or concepts that would be helpful for that.
>
> Since our project isn't really close to a finished program yet and consists mainly of various test programs, I cannot really show you a complete code of what I already have, but if anyone has any specific questions I probably can send you a pseudocode or a java code of one of these test programs to describe what I imagine. Also, since we are still relatively open on how exactly we want to solve our original problem, I'm also open to suggestions which solve only  a similar problem, even if they don't fully fit what I described above.
>
> It's of course also possible that there is already a simple solution in Flink which I somehow manged to overlook until now. In that case I'm sorry for bothering you but I would still like to know what I should look up.
>
>
> Best, Christian
>
>
>