Hi,
I have a question about generating the sub-graph using Spargel API. We use filterOnVertices to generate it. With 30MB edges, the code gets stuck at Join(Join at filterOnVertices) With 2MB edges, the code doesn't have this issue. Log ------------------------------------------------------------------------------------------------------------------ 02/18/2015 10:34:23: Join(Join at filterOnVertices(Graph.java:615)) (7/20) switched to FINISHED 02/18/2015 10:34:23: Join(Join at filterOnVertices(Graph.java:615)) (12/20) switched to FINISHED 02/18/2015 10:34:23: Join(Join at filterOnVertices(Graph.java:615)) (14/20) switched to FINISHED 02/18/2015 10:34:23: Join(Join at filterOnVertices(Graph.java:615)) (17/20) switched to FINISHED 02/18/2015 10:34:23: Join(Join at filterOnVertices(Graph.java:615)) (20/20) switched to FINISHED 02/18/2015 10:34:23: Join(Join at filterOnVertices(Graph.java:615)) (13/20) switched to FINISHED 02/18/2015 10:34:24: Join(Join at filterOnVertices(Graph.java:615)) (8/20) switched to FINISHED 02/18/2015 10:34:24: Join(Join at filterOnVertices(Graph.java:615)) (2/20) switched to FINISHED 02/18/2015 10:34:24: Join(Join at filterOnVertices(Graph.java:615)) (3/20) switched to FINISHED 02/18/2015 10:34:24: Join(Join at filterOnVertices(Graph.java:615)) (19/20) switched to FINISHED 02/18/2015 10:34:24: Join(Join at filterOnVertices(Graph.java:615)) (16/20) switched to FINISHED It takes more than 10 minutes to continue while other operators complete in seconds. From the log, it looks like some workers finish and some doesn't. The Spargel API shows it uses join twice so this operator looks a bit expensive. Would it be the reason that the job gets stuck? Our goal of using filterOnVertices is to use the sub-graph as an input for next iteration. ------------------------------------------------------------------------------------------------------------------ public Graph<K, VV, EV> filterOnVertices(FilterFunction<Vertex<K, VV>> vertexFilter) { DataSet<Vertex<K, VV>> filteredVertices = this.vertices.filter(vertexFilter); DataSet<Edge<K, EV>> remainingEdges = this.edges.join(filteredVertices) .where(0).equalTo(0) .with(new ProjectEdge<K, VV, EV>()) .join(filteredVertices).where(1).equalTo(0) .with(new ProjectEdge<K, VV, EV>()); return new Graph<K, VV, EV>(filteredVertices, remainingEdges, this.context); } Best regards, Hung |
Hi Hung, can you share some details on your algorithm and dataset? I could not reproduce this by just running a filterOnVertices on large input. Thank you, Vasia. On 18 February 2015 at 19:03, HungChang <[hidden email]> wrote: Hi, |
This post was updated on .
Thank you for your reply.
The dataset: The 1MB dataset is 38831 nodes and 99565 edges which doesn't get stuck. The 30MB dataset is 1,134,890 nodes and 2,987,624 edges which gets stuck. Our code works like the following logic: do{ filteredGraph = graph.run(algorithm); // Get sub-graph for next iteration, where the code gets stuck with 30MB dataset filteredGraph = filteredGraph.filterOnVertices(new FilterVertex()); }while (filteredGraph.hasNode()) filter function is as follows ------------------------------------------------------------------------------------------------------- public static final class FilterVertex implements FilterFunction<Tuple4<Integer, Integer, Integer, Integer>> { @Override public boolean filter(Tuple4<Integer, Integer, Integer, Integer> value) throws Exception { return value.f0 == -1; } } |
Hi Hung, I am under the impression that circular dependencies like the one you are describing are not allowed in the Flink execution graph. I would actually expect something like this to cause an error. Maybe someone else can elaborate on that? In any case, the proper way to write iterative programs in Flink is by using the dedicated iteration operators. As far as I understand, you want to run an iterative algorithm on a graph, where you feed the result of one iteration to the next. Is this the case? For the moment, if you want to use a Graph API, you can either use Gelly's runVertexCentricIteration or Spargel. If your algorithm requires more flexibility, then you can build an arbitrary dataflow inside an iteration, using Flink's iteration operators. Take a look at [1] for a description of how to use those and let us know if you have any doubts. Cheers, V. On 18 February 2015 at 20:53, HungChang <[hidden email]> wrote: Thank you for your reply. |
This post was updated on .
Thank you for the information you provided.
Yes, it runs an iterative algorithm on a graph and feeds the result of one iteration to the next. The getting stuck issue disappears when increasing the maximal iterations in the algorithm ex. increase to 1000 vertex centric iterations in the algorithm, before it was 10. The algorithm finds one maximal independent set in the while loop. After some iterations it only finds a maximal independent set consists of only one node. Therefore, this idea might be wrong, but I would guess the reason that join gets stuck is because it doesn't find any maximal independent set in that vertex centric iteration. This causes the graph cannot become sub-graph by filtering. |
Sounds like what is getting stuck here is not flink but your while loop
which does not come to the point where there is no node in the graph. On 18.02.2015 22:39, HungChang wrote: > Thank you for the information you provided. > > Yes, it runs an iterative algorithm on a graph and feeds the result of one > iteration to the next. > > The getting stuck issue disappears when increasing the maximal iterations in > the algorithm > ex. increase to 1000 vertex centric iterations in the algorithm, before it > was 10. > > The algorithm finds one maximal independent set in the while loop. > After some iterations it only finds maximal independent set consist of only > one node. > > Therefore, this idea might be wrong, but I would guess the reason that join > gets stuck is because it doesn't find any maximal independent set in that > vertex centric iteration. This causes the graph cannot become sub-graph by > filtering. -- mail: [hidden email] phone: +49 176 / 96 52 999 7 www: http://cebe.cc/ pgp: http://cebe.cc/cebe_pub.asc skype: skype://cebe08 irc: cebe @ irc.freenode.net |
Free forum by Nabble | Edit this page |