Question about Writing Incremental Graph Algorithms using Apache Flink Gelly

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Question about Writing Incremental Graph Algorithms using Apache Flink Gelly

kaansancak
Hi all,
As I stated in the post here (https://stackoverflow.com/questions/61164302/writing-incremental-graph-algorithms-using-apache-flink-gelly), I am trying to write a dynamic algorithm using Flink Gelly. However, I couldn’t find any examples. Most of the examples are performed on static graphs. What I need is an incremental use-case/example. I would be grateful, if you can point out some examples, or any help is appreciated. 

Best
Kaan Sancak

PhD Student
Georgia Tech
Reply | Threaded
Open this post in threaded view
|

Re: Question about Writing Incremental Graph Algorithms using Apache Flink Gelly

Tzu-Li (Gordon) Tai
Hi,

As you mentioned, Gelly Graph's are backed by Flink DataSets, and therefore
work primarily on static graphs. I don't think it'll be possible to
implement incremental algorithms described in your SO question.

Have you tried looking at Stateful Functions, a recent new API added to
Flink?
It supports arbitrary messaging between functions, which may allow you to
build what you have in mind.
Take a look at Seth's an Igal's comments here [1], where there seems to be a
similar incremental graph-processing use case for sessionization.

Cheers,
Gordon

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Complex-graph-based-sessionization-potential-use-for-stateful-functions-td34000.html#a34017



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Question about Writing Incremental Graph Algorithms using Apache Flink Gelly

Flavio Pompermaier
From what I see Gelly is not really maintained or used anymore..do you think it could make sense to deprecate it and write a guide (on the documentation) about how to rewrite a Gelly app into a Statefun one?

On Tue, Apr 14, 2020 at 5:16 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

As you mentioned, Gelly Graph's are backed by Flink DataSets, and therefore
work primarily on static graphs. I don't think it'll be possible to
implement incremental algorithms described in your SO question.

Have you tried looking at Stateful Functions, a recent new API added to
Flink?
It supports arbitrary messaging between functions, which may allow you to
build what you have in mind.
Take a look at Seth's an Igal's comments here [1], where there seems to be a
similar incremental graph-processing use case for sessionization.

Cheers,
Gordon

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Complex-graph-based-sessionization-potential-use-for-stateful-functions-td34000.html#a34017



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Question about Writing Incremental Graph Algorithms using Apache Flink Gelly

kaansancak
In reply to this post by Tzu-Li (Gordon) Tai
Thanks for the useful information! It seems like a good and fun idea to experiment. I will definitely give it a try.

I have a very close upcoming deadline and I have already implemented the Scatter-Gather iteration algorithm.

I have another question on whether we can chain Scatter-Gather or Vertex-Centric iterations.
Let’s say that we have an initial batch/dataset, we run a Scatter-Gather and obtain graph.
Using another batch we added/deleted vertices to the graph we obtained. 
Now we run another Scatter-Gather on the modified graph.

This is no streaming but a naive way to simulate batch updates that are happening concurrently.
Do you think it is a feasible way to do this way? 

Best
Kaan

On Apr 13, 2020, at 11:16 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:

Hi,

As you mentioned, Gelly Graph's are backed by Flink DataSets, and therefore
work primarily on static graphs. I don't think it'll be possible to
implement incremental algorithms described in your SO question.

Have you tried looking at Stateful Functions, a recent new API added to
Flink?
It supports arbitrary messaging between functions, which may allow you to
build what you have in mind.
Take a look at Seth's an Igal's comments here [1], where there seems to be a
similar incremental graph-processing use case for sessionization.

Cheers,
Gordon

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Complex-graph-based-sessionization-potential-use-for-stateful-functions-td34000.html#a34017



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Question about Writing Incremental Graph Algorithms using Apache Flink Gelly

Till Rohrmann
Hi Kaan,

I think what you are proposing is something like this:

Graph<Long, Double, Double> graph = ... // get first batch

Graph<Long, Double, Double> graphAfterFirstSG = graph.runScatterGatherIteration();

Graph<Long, Double, Double> secondBatch = ... // get second batch

// Adjust the result of SG iteration with secondBatch

Graph<Long, Double, Double> updatedGraph = graphAfterFirstSG.union/difference(secondBatch));

updatedGraph.runScatterGatherIteration();

Then I believe this should work.

Cheers,
Till

On Wed, Apr 15, 2020 at 1:14 AM Kaan Sancak <[hidden email]> wrote:
Thanks for the useful information! It seems like a good and fun idea to experiment. I will definitely give it a try.

I have a very close upcoming deadline and I have already implemented the Scatter-Gather iteration algorithm.

I have another question on whether we can chain Scatter-Gather or Vertex-Centric iterations.
Let’s say that we have an initial batch/dataset, we run a Scatter-Gather and obtain graph.
Using another batch we added/deleted vertices to the graph we obtained. 
Now we run another Scatter-Gather on the modified graph.

This is no streaming but a naive way to simulate batch updates that are happening concurrently.
Do you think it is a feasible way to do this way? 

Best
Kaan

On Apr 13, 2020, at 11:16 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:

Hi,

As you mentioned, Gelly Graph's are backed by Flink DataSets, and therefore
work primarily on static graphs. I don't think it'll be possible to
implement incremental algorithms described in your SO question.

Have you tried looking at Stateful Functions, a recent new API added to
Flink?
It supports arbitrary messaging between functions, which may allow you to
build what you have in mind.
Take a look at Seth's an Igal's comments here [1], where there seems to be a
similar incremental graph-processing use case for sessionization.

Cheers,
Gordon

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Complex-graph-based-sessionization-potential-use-for-stateful-functions-td34000.html#a34017



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Question about Writing Incremental Graph Algorithms using Apache Flink Gelly

kaansancak
Thanks that is working now!

I have one last question.

Goin one step further, I have changed vertex value type to be a POJO class. The structure is somewhat similar to this,

class LocalStorage {
Integer id;
Long degree;
        Boolean active;
        List<Long> labels;
        Map<Long, Long> neighborDegree;
       
       ….
}

During the execution, I got an error saying that `org.apache.flink.api.common.InvalidProgramException: This type (GenericType<java.util.ArrayList>) cannot be used as key`.

 After having some reading I have implemented Value and Comparable interfaces . But now, after scatter phase ends, the localstorage values seems to be not persistent. For, I set the the active flag true in scatter phase, but during gather phase, the `active` flag seems to be null. (Note that, I have already know the degree and id is accessible within vertex context, but I am just trying to see what can I do with the framework).

I am guessing  this is a serialization/deserialization related issue. I had some online digging and github search but I couldn’t really find a fix it. I have tried some approaches suggested on SO but they didn’t work for me. Is this a problem related to my POJO class having list and map types? Is this supported?

It would be great if someone can point out a similar example or an easy fix.

Best
Kaan

On Apr 15, 2020, at 11:07 AM, Till Rohrmann <[hidden email]> wrote:

Hi Kaan,

I think what you are proposing is something like this:

Graph<Long, Double, Double> graph = ... // get first batch

Graph<Long, Double, Double> graphAfterFirstSG = graph.runScatterGatherIteration();

Graph<Long, Double, Double> secondBatch = ... // get second batch

// Adjust the result of SG iteration with secondBatch

Graph<Long, Double, Double> updatedGraph = graphAfterFirstSG.union/difference(secondBatch));

updatedGraph.runScatterGatherIteration();

Then I believe this should work.

Cheers,
Till

On Wed, Apr 15, 2020 at 1:14 AM Kaan Sancak <[hidden email]> wrote:
Thanks for the useful information! It seems like a good and fun idea to experiment. I will definitely give it a try.

I have a very close upcoming deadline and I have already implemented the Scatter-Gather iteration algorithm.

I have another question on whether we can chain Scatter-Gather or Vertex-Centric iterations.
Let’s say that we have an initial batch/dataset, we run a Scatter-Gather and obtain graph.
Using another batch we added/deleted vertices to the graph we obtained. 
Now we run another Scatter-Gather on the modified graph.

This is no streaming but a naive way to simulate batch updates that are happening concurrently.
Do you think it is a feasible way to do this way? 

Best
Kaan

On Apr 13, 2020, at 11:16 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:

Hi,

As you mentioned, Gelly Graph's are backed by Flink DataSets, and therefore
work primarily on static graphs. I don't think it'll be possible to
implement incremental algorithms described in your SO question.

Have you tried looking at Stateful Functions, a recent new API added to
Flink?
It supports arbitrary messaging between functions, which may allow you to
build what you have in mind.
Take a look at Seth's an Igal's comments here [1], where there seems to be a
similar incremental graph-processing use case for sessionization.

Cheers,
Gordon

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Complex-graph-based-sessionization-potential-use-for-stateful-functions-td34000.html#a34017



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: Question about Writing Incremental Graph Algorithms using Apache Flink Gelly

Till Rohrmann
Hi Kaan,

I'm not entirely sure what's going wrong w/o having a minimal code example which is able to reproduce the problem. So if you could provide us with this, that would allow us to look into it.

Cheers,
Till

On Wed, Apr 15, 2020 at 6:59 PM Kaan Sancak <[hidden email]> wrote:
Thanks that is working now!

I have one last question.

Goin one step further, I have changed vertex value type to be a POJO class. The structure is somewhat similar to this,

class LocalStorage {
Integer id;
Long degree;
        Boolean active;
        List<Long> labels;
        Map<Long, Long> neighborDegree;
       
       ….
}

During the execution, I got an error saying that `org.apache.flink.api.common.InvalidProgramException: This type (GenericType<java.util.ArrayList>) cannot be used as key`.

 After having some reading I have implemented Value and Comparable interfaces . But now, after scatter phase ends, the localstorage values seems to be not persistent. For, I set the the active flag true in scatter phase, but during gather phase, the `active` flag seems to be null. (Note that, I have already know the degree and id is accessible within vertex context, but I am just trying to see what can I do with the framework).

I am guessing  this is a serialization/deserialization related issue. I had some online digging and github search but I couldn’t really find a fix it. I have tried some approaches suggested on SO but they didn’t work for me. Is this a problem related to my POJO class having list and map types? Is this supported?

It would be great if someone can point out a similar example or an easy fix.

Best
Kaan

On Apr 15, 2020, at 11:07 AM, Till Rohrmann <[hidden email]> wrote:

Hi Kaan,

I think what you are proposing is something like this:

Graph<Long, Double, Double> graph = ... // get first batch

Graph<Long, Double, Double> graphAfterFirstSG = graph.runScatterGatherIteration();

Graph<Long, Double, Double> secondBatch = ... // get second batch

// Adjust the result of SG iteration with secondBatch

Graph<Long, Double, Double> updatedGraph = graphAfterFirstSG.union/difference(secondBatch));

updatedGraph.runScatterGatherIteration();

Then I believe this should work.

Cheers,
Till

On Wed, Apr 15, 2020 at 1:14 AM Kaan Sancak <[hidden email]> wrote:
Thanks for the useful information! It seems like a good and fun idea to experiment. I will definitely give it a try.

I have a very close upcoming deadline and I have already implemented the Scatter-Gather iteration algorithm.

I have another question on whether we can chain Scatter-Gather or Vertex-Centric iterations.
Let’s say that we have an initial batch/dataset, we run a Scatter-Gather and obtain graph.
Using another batch we added/deleted vertices to the graph we obtained. 
Now we run another Scatter-Gather on the modified graph.

This is no streaming but a naive way to simulate batch updates that are happening concurrently.
Do you think it is a feasible way to do this way? 

Best
Kaan

On Apr 13, 2020, at 11:16 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:

Hi,

As you mentioned, Gelly Graph's are backed by Flink DataSets, and therefore
work primarily on static graphs. I don't think it'll be possible to
implement incremental algorithms described in your SO question.

Have you tried looking at Stateful Functions, a recent new API added to
Flink?
It supports arbitrary messaging between functions, which may allow you to
build what you have in mind.
Take a look at Seth's an Igal's comments here [1], where there seems to be a
similar incremental graph-processing use case for sessionization.

Cheers,
Gordon

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Complex-graph-based-sessionization-potential-use-for-stateful-functions-td34000.html#a34017



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: Question about Writing Incremental Graph Algorithms using Apache Flink Gelly

kaansancak
Thanks for the reply. Turns out that my serializer was writing one of the fields wrong.
I fixed it and everything seems to be working correctly for now.


Best
Kaan


On Apr 16, 2020, at 3:05 AM, Till Rohrmann <[hidden email]> wrote:

Hi Kaan,

I'm not entirely sure what's going wrong w/o having a minimal code example which is able to reproduce the problem. So if you could provide us with this, that would allow us to look into it.

Cheers,
Till

On Wed, Apr 15, 2020 at 6:59 PM Kaan Sancak <[hidden email]> wrote:
Thanks that is working now!

I have one last question.

Goin one step further, I have changed vertex value type to be a POJO class. The structure is somewhat similar to this,

class LocalStorage {
Integer id;
Long degree;
        Boolean active;
        List<Long> labels;
        Map<Long, Long> neighborDegree;
       
       ….
}

During the execution, I got an error saying that `org.apache.flink.api.common.InvalidProgramException: This type (GenericType<java.util.ArrayList>) cannot be used as key`.

 After having some reading I have implemented Value and Comparable interfaces . But now, after scatter phase ends, the localstorage values seems to be not persistent. For, I set the the active flag true in scatter phase, but during gather phase, the `active` flag seems to be null. (Note that, I have already know the degree and id is accessible within vertex context, but I am just trying to see what can I do with the framework).

I am guessing  this is a serialization/deserialization related issue. I had some online digging and github search but I couldn’t really find a fix it. I have tried some approaches suggested on SO but they didn’t work for me. Is this a problem related to my POJO class having list and map types? Is this supported?

It would be great if someone can point out a similar example or an easy fix.

Best
Kaan

On Apr 15, 2020, at 11:07 AM, Till Rohrmann <[hidden email]> wrote:

Hi Kaan,

I think what you are proposing is something like this:

Graph<Long, Double, Double> graph = ... // get first batch

Graph<Long, Double, Double> graphAfterFirstSG = graph.runScatterGatherIteration();

Graph<Long, Double, Double> secondBatch = ... // get second batch

// Adjust the result of SG iteration with secondBatch

Graph<Long, Double, Double> updatedGraph = graphAfterFirstSG.union/difference(secondBatch));

updatedGraph.runScatterGatherIteration();

Then I believe this should work.

Cheers,
Till

On Wed, Apr 15, 2020 at 1:14 AM Kaan Sancak <[hidden email]> wrote:
Thanks for the useful information! It seems like a good and fun idea to experiment. I will definitely give it a try.

I have a very close upcoming deadline and I have already implemented the Scatter-Gather iteration algorithm.

I have another question on whether we can chain Scatter-Gather or Vertex-Centric iterations.
Let’s say that we have an initial batch/dataset, we run a Scatter-Gather and obtain graph.
Using another batch we added/deleted vertices to the graph we obtained. 
Now we run another Scatter-Gather on the modified graph.

This is no streaming but a naive way to simulate batch updates that are happening concurrently.
Do you think it is a feasible way to do this way? 

Best
Kaan

On Apr 13, 2020, at 11:16 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:

Hi,

As you mentioned, Gelly Graph's are backed by Flink DataSets, and therefore
work primarily on static graphs. I don't think it'll be possible to
implement incremental algorithms described in your SO question.

Have you tried looking at Stateful Functions, a recent new API added to
Flink?
It supports arbitrary messaging between functions, which may allow you to
build what you have in mind.
Take a look at Seth's an Igal's comments here [1], where there seems to be a
similar incremental graph-processing use case for sessionization.

Cheers,
Gordon

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Complex-graph-based-sessionization-potential-use-for-stateful-functions-td34000.html#a34017



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Reply | Threaded
Open this post in threaded view
|

Re: Question about Writing Incremental Graph Algorithms using Apache Flink Gelly

kaansancak
In reply to this post by Till Rohrmann
If the vertex type is POJO what happens during the union of the graph? Is there a persistent approach, or can we define a function handle such occasions?

Would there be a performance difference between two cases:

1)
Graph graph = … // From edges list

graph = graph.runScatterGatherIteration();

Graph secondGraph = … // From second edge list

graph = graph.union(secondGraph).runScatterGatherIteration()

2)

Graph graph = … // From edges list

graph = graph.runScatterGatherIteration();

graph.addEdges(second_edge_list)

graph = graph.runScatterGatherIteration();


Before starting the second scatter-gather, I want to set/reset some fields of the vertex value of the vertices that are effected by edge additions/deletions (or union). It would be good to have a callback function that touches the end-points of the edges that are added/deleted.

Best
Kaan



On Apr 15, 2020, at 11:07 AM, Till Rohrmann <[hidden email]> wrote:

Hi Kaan,

I think what you are proposing is something like this:

Graph<Long, Double, Double> graph = ... // get first batch

Graph<Long, Double, Double> graphAfterFirstSG = graph.runScatterGatherIteration();

Graph<Long, Double, Double> secondBatch = ... // get second batch

// Adjust the result of SG iteration with secondBatch

Graph<Long, Double, Double> updatedGraph = graphAfterFirstSG.union/difference(secondBatch));

updatedGraph.runScatterGatherIteration();

Then I believe this should work.

Cheers,
Till

On Wed, Apr 15, 2020 at 1:14 AM Kaan Sancak <[hidden email]> wrote:
Thanks for the useful information! It seems like a good and fun idea to experiment. I will definitely give it a try.

I have a very close upcoming deadline and I have already implemented the Scatter-Gather iteration algorithm.

I have another question on whether we can chain Scatter-Gather or Vertex-Centric iterations.
Let’s say that we have an initial batch/dataset, we run a Scatter-Gather and obtain graph.
Using another batch we added/deleted vertices to the graph we obtained. 
Now we run another Scatter-Gather on the modified graph.

This is no streaming but a naive way to simulate batch updates that are happening concurrently.
Do you think it is a feasible way to do this way? 

Best
Kaan

On Apr 13, 2020, at 11:16 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:

Hi,

As you mentioned, Gelly Graph's are backed by Flink DataSets, and therefore
work primarily on static graphs. I don't think it'll be possible to
implement incremental algorithms described in your SO question.

Have you tried looking at Stateful Functions, a recent new API added to
Flink?
It supports arbitrary messaging between functions, which may allow you to
build what you have in mind.
Take a look at Seth's an Igal's comments here [1], where there seems to be a
similar incremental graph-processing use case for sessionization.

Cheers,
Gordon

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Complex-graph-based-sessionization-potential-use-for-stateful-functions-td34000.html#a34017



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: Question about Writing Incremental Graph Algorithms using Apache Flink Gelly

Yun Gao
Hi Kaan,

   For the first issue, I think the two implementation should have difference and the first should be slower, but I think which one to use should be depend on your algorithm if it could compute incrementally only with the changed edges. However, as far as I know I think most graph algorithm does not satisfy this property, therefore I think you might have to use the first one. 

    For the second issue, I think you might use Graph.getVertices() and graph.getEdges() to get the underlying vertices and edges dataset of the graph, then you could do any operations with the two datasets, like join the vertices dataset with the second edge list, and finally create a new Graph with new Graph(updated_vertices, edges, env).

Best,
 Yun

------------------------------------------------------------------
From:Kaan Sancak <[hidden email]>
Send Time:2020 Apr. 16 (Thu.) 17:17
To:Till Rohrmann <[hidden email]>
Cc:Tzu-Li (Gordon) Tai <[hidden email]>; user <[hidden email]>
Subject:Re: Question about Writing Incremental Graph Algorithms using Apache Flink Gelly

If the vertex type is POJO what happens during the union of the graph? Is there a persistent approach, or can we define a function handle such occasions?

Would there be a performance difference between two cases:

1)
Graph graph = … // From edges list

graph = graph.runScatterGatherIteration();

Graph secondGraph = … // From second edge list

graph = graph.union(secondGraph).runScatterGatherIteration()

2)

Graph graph = … // From edges list

graph = graph.runScatterGatherIteration();

graph.addEdges(second_edge_list)

graph = graph.runScatterGatherIteration();


Before starting the second scatter-gather, I want to set/reset some fields of the vertex value of the vertices that are effected by edge additions/deletions (or union). It would be good to have a callback function that touches the end-points of the edges that are added/deleted.

Best
Kaan



On Apr 15, 2020, at 11:07 AM, Till Rohrmann <[hidden email]> wrote:

Hi Kaan,

I think what you are proposing is something like this:

Graph<Long, Double, Double> graph = ... // get first batch

Graph<Long, Double, Double> graphAfterFirstSG = graph.runScatterGatherIteration();

Graph<Long, Double, Double> secondBatch = ... // get second batch

// Adjust the result of SG iteration with secondBatch

Graph<Long, Double, Double> updatedGraph = graphAfterFirstSG.union/difference(secondBatch));

updatedGraph.runScatterGatherIteration();

Then I believe this should work.

Cheers,
Till

On Wed, Apr 15, 2020 at 1:14 AM Kaan Sancak <[hidden email]> wrote:
Thanks for the useful information! It seems like a good and fun idea to experiment. I will definitely give it a try.

I have a very close upcoming deadline and I have already implemented the Scatter-Gather iteration algorithm.

I have another question on whether we can chain Scatter-Gather or Vertex-Centric iterations.
Let’s say that we have an initial batch/dataset, we run a Scatter-Gather and obtain graph.
Using another batch we added/deleted vertices to the graph we obtained. 
Now we run another Scatter-Gather on the modified graph.

This is no streaming but a naive way to simulate batch updates that are happening concurrently.
Do you think it is a feasible way to do this way? 

Best
Kaan

On Apr 13, 2020, at 11:16 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:

Hi,

As you mentioned, Gelly Graph's are backed by Flink DataSets, and therefore
work primarily on static graphs. I don't think it'll be possible to
implement incremental algorithms described in your SO question.

Have you tried looking at Stateful Functions, a recent new API added to
Flink?
It supports arbitrary messaging between functions, which may allow you to
build what you have in mind.
Take a look at Seth's an Igal's comments here [1], where there seems to be a
similar incremental graph-processing use case for sessionization.

Cheers,
Gordon

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Complex-graph-based-sessionization-potential-use-for-stateful-functions-td34000.html#a34017



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: Question about Writing Incremental Graph Algorithms using Apache Flink Gelly

kaansancak
Thanks that worked!

I wonder what will be the performance difference if I implement this with Stateful Functions. Does anyone knows recent works/papers on similar approach?

Best
Kaan


On 16 Apr 2020, at 10:00, Yun Gao <[hidden email]> wrote:


Hi Kaan,

   For the first issue, I think the two implementation should have difference and the first should be slower, but I think which one to use should be depend on your algorithm if it could compute incrementally only with the changed edges. However, as far as I know I think most graph algorithm does not satisfy this property, therefore I think you might have to use the first one. 

    For the second issue, I think you might use Graph.getVertices() and graph.getEdges() to get the underlying vertices and edges dataset of the graph, then you could do any operations with the two datasets, like join the vertices dataset with the second edge list, and finally create a new Graph with new Graph(updated_vertices, edges, env).

Best,
 Yun

------------------------------------------------------------------
From:Kaan Sancak <[hidden email]>
Send Time:2020 Apr. 16 (Thu.) 17:17
To:Till Rohrmann <[hidden email]>
Cc:Tzu-Li (Gordon) Tai <[hidden email]>; user <[hidden email]>
Subject:Re: Question about Writing Incremental Graph Algorithms using Apache Flink Gelly

If the vertex type is POJO what happens during the union of the graph? Is there a persistent approach, or can we define a function handle such occasions?

Would there be a performance difference between two cases:

1)
Graph graph = … // From edges list

graph = graph.runScatterGatherIteration();

Graph secondGraph = … // From second edge list

graph = graph.union(secondGraph).runScatterGatherIteration()

2)

Graph graph = … // From edges list

graph = graph.runScatterGatherIteration();

graph.addEdges(second_edge_list)

graph = graph.runScatterGatherIteration();


Before starting the second scatter-gather, I want to set/reset some fields of the vertex value of the vertices that are effected by edge additions/deletions (or union). It would be good to have a callback function that touches the end-points of the edges that are added/deleted.

Best
Kaan



On Apr 15, 2020, at 11:07 AM, Till Rohrmann <[hidden email]> wrote:

Hi Kaan,

I think what you are proposing is something like this:

Graph<Long, Double, Double> graph = ... // get first batch

Graph<Long, Double, Double> graphAfterFirstSG = graph.runScatterGatherIteration();

Graph<Long, Double, Double> secondBatch = ... // get second batch

// Adjust the result of SG iteration with secondBatch

Graph<Long, Double, Double> updatedGraph = graphAfterFirstSG.union/difference(secondBatch));

updatedGraph.runScatterGatherIteration();

Then I believe this should work.

Cheers,
Till

On Wed, Apr 15, 2020 at 1:14 AM Kaan Sancak <[hidden email]> wrote:
Thanks for the useful information! It seems like a good and fun idea to experiment. I will definitely give it a try.

I have a very close upcoming deadline and I have already implemented the Scatter-Gather iteration algorithm.

I have another question on whether we can chain Scatter-Gather or Vertex-Centric iterations.
Let’s say that we have an initial batch/dataset, we run a Scatter-Gather and obtain graph.
Using another batch we added/deleted vertices to the graph we obtained. 
Now we run another Scatter-Gather on the modified graph.

This is no streaming but a naive way to simulate batch updates that are happening concurrently.
Do you think it is a feasible way to do this way? 

Best
Kaan

On Apr 13, 2020, at 11:16 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:

Hi,

As you mentioned, Gelly Graph's are backed by Flink DataSets, and therefore
work primarily on static graphs. I don't think it'll be possible to
implement incremental algorithms described in your SO question.

Have you tried looking at Stateful Functions, a recent new API added to
Flink?
It supports arbitrary messaging between functions, which may allow you to
build what you have in mind.
Take a look at Seth's an Igal's comments here [1], where there seems to be a
similar incremental graph-processing use case for sessionization.

Cheers,
Gordon

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Complex-graph-based-sessionization-potential-use-for-stateful-functions-td34000.html#a34017



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/