Re: Question about Writing Incremental Graph Algorithms using Apache Flink Gelly

Posted by kaansancak on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Question-about-Writing-Incremental-Graph-Algorithms-using-Apache-Flink-Gelly-tp34253p34346.html

Thanks that is working now!

I have one last question.

Goin one step further, I have changed vertex value type to be a POJO class. The structure is somewhat similar to this,

class LocalStorage {
Integer id;
Long degree;
        Boolean active;
        List<Long> labels;
        Map<Long, Long> neighborDegree;
       
       ….
}

During the execution, I got an error saying that `org.apache.flink.api.common.InvalidProgramException: This type (GenericType<java.util.ArrayList>) cannot be used as key`.

 After having some reading I have implemented Value and Comparable interfaces . But now, after scatter phase ends, the localstorage values seems to be not persistent. For, I set the the active flag true in scatter phase, but during gather phase, the `active` flag seems to be null. (Note that, I have already know the degree and id is accessible within vertex context, but I am just trying to see what can I do with the framework).

I am guessing  this is a serialization/deserialization related issue. I had some online digging and github search but I couldn’t really find a fix it. I have tried some approaches suggested on SO but they didn’t work for me. Is this a problem related to my POJO class having list and map types? Is this supported?

It would be great if someone can point out a similar example or an easy fix.

Best
Kaan

On Apr 15, 2020, at 11:07 AM, Till Rohrmann <[hidden email]> wrote:

Hi Kaan,

I think what you are proposing is something like this:

Graph<Long, Double, Double> graph = ... // get first batch

Graph<Long, Double, Double> graphAfterFirstSG = graph.runScatterGatherIteration();

Graph<Long, Double, Double> secondBatch = ... // get second batch

// Adjust the result of SG iteration with secondBatch

Graph<Long, Double, Double> updatedGraph = graphAfterFirstSG.union/difference(secondBatch));

updatedGraph.runScatterGatherIteration();

Then I believe this should work.

Cheers,
Till

On Wed, Apr 15, 2020 at 1:14 AM Kaan Sancak <[hidden email]> wrote:
Thanks for the useful information! It seems like a good and fun idea to experiment. I will definitely give it a try.

I have a very close upcoming deadline and I have already implemented the Scatter-Gather iteration algorithm.

I have another question on whether we can chain Scatter-Gather or Vertex-Centric iterations.
Let’s say that we have an initial batch/dataset, we run a Scatter-Gather and obtain graph.
Using another batch we added/deleted vertices to the graph we obtained. 
Now we run another Scatter-Gather on the modified graph.

This is no streaming but a naive way to simulate batch updates that are happening concurrently.
Do you think it is a feasible way to do this way? 

Best
Kaan

On Apr 13, 2020, at 11:16 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:

Hi,

As you mentioned, Gelly Graph's are backed by Flink DataSets, and therefore
work primarily on static graphs. I don't think it'll be possible to
implement incremental algorithms described in your SO question.

Have you tried looking at Stateful Functions, a recent new API added to
Flink?
It supports arbitrary messaging between functions, which may allow you to
build what you have in mind.
Take a look at Seth's an Igal's comments here [1], where there seems to be a
similar incremental graph-processing use case for sessionization.

Cheers,
Gordon

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Complex-graph-based-sessionization-potential-use-for-stateful-functions-td34000.html#a34017



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/