http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Too-few-memory-segments-provided-exception-tp2176.html
Hello,
I am working on a problem which implements Adamic Adar Algorithm using Gelly.
I am running into this exception for all the Joins (including the one that are part of the reduceOnNeighbors function)
Too few memory segments provided. Hash Join needs at least 33 memory segments.
The problem persists even when I comment out some of the joins.
Even after using edg = edg.join(graph.getEdges(), JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new JoinEdge());
as suggested by @AndraLungu the problem persists.
The code is
DataSet<Tuple2<Long, Long>> degrees = graph.getDegrees();
//get neighbors of each vertex in the HashSet for it's value
computedNeighbors = graph.reduceOnNeighbors(new GatherNeighbors(), EdgeDirection.ALL);
//get vertices with updated values for the final Graph which will be used to get Adamic Edges
Vertices = computedNeighbors.join(degrees, JoinOperatorBase.JoinHint.BROADCAST_HASH_FIRST).where(0).equalTo(0).with(new JoinNeighborDegrees());
Graph<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, Long, Double>>>, Double> updatedGraph =
Graph.fromDataSet(Vertices, edges, env);
//configure Vertex Centric Iteration
VertexCentricConfiguration parameters = new VertexCentricConfiguration();
parameters.setName("Find Adamic Adar Edge Weights");
parameters.setDirection(EdgeDirection.ALL);
//run Vertex Centric Iteration to get the Adamic Adar Edges into the vertex Value
updatedGraph = updatedGraph.runVertexCentricIteration(new GetAdamicAdarEdges<Long>(), new NeighborsMessenger<Long>(), 1, parameters);
//Extract Vertices of the updated graph
DataSet<Vertex<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, Long, Double>>>>> vertices = updatedGraph.getVertices();
//Extract the list of Edges from the vertex values
DataSet<Tuple3<Long, Long, Double>> edg = vertices.flatMap(new GetAdamicList());
//Partial weights for the edges are added
edg = edg.groupBy(0,1).reduce(new AdamGroup());
//Graph is updated with the Adamic Adar Edges
edg = edg.join(graph.getEdges(), JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new JoinEdge());
Any idea how I could tackle this Exception?