package graphdistance; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.spargel.MessageIterator; import org.apache.flink.graph.spargel.MessagingFunction; import org.apache.flink.graph.spargel.VertexCentricConfiguration; import org.apache.flink.graph.spargel.VertexUpdateFunction; import org.apache.flink.types.NullValue; import java.io.Serializable; @SuppressWarnings("serial") public class APSP & Serializable> implements GraphAlgorithm, NullValue> { private final K srcVertexId; private final Integer maxIterations; public APSP(K srcVertexId, Integer maxIterations) { this.srcVertexId = srcVertexId; this.maxIterations = maxIterations; } @Override public Graph, NullValue> run(Graph, NullValue> input) { // VertexCentricConfiguration : UNMANAGED MEMORY because of FLINK-1916 bug VertexCentricConfiguration parameters = new VertexCentricConfiguration(); parameters.setSolutionSetUnmanagedMemory(false); return input.mapVertices(new InitVerticesMapper(srcVertexId)) .runVertexCentricIteration(new VertexDistanceUpdater, Integer>(srcVertexId), new MinDistanceMessenger, Integer, NullValue>(srcVertexId), maxIterations, parameters); } public static final class InitVerticesMapper & Serializable> implements MapFunction>, Tuple2> { private K srcVertexId; private int srcVertexIdInteger; public InitVerticesMapper(K srcId) { this.srcVertexId = srcId; this.srcVertexIdInteger = Integer.valueOf(((Integer) srcVertexId).intValue()); } public Tuple2 map(Vertex> vertex) { Integer[] vertexDistanceVector = vertex.f1.f0; try { vertexDistanceVector [srcVertexIdInteger] = 2000000000 ; // = java Integer max value // 2,147,483,647 => type overflow } catch (Exception e) { System.out.println("DEBUG vertexDistanceVector length = " + vertexDistanceVector.length + " srcVertexIdInteger = " + srcVertexIdInteger); } if (vertex.f0.equals(srcVertexId)) { //System.out.println("DEBUG setting vertexDistanceVector[ " + srcVertexIdInteger + " ] = 0L \n"); vertexDistanceVector[srcVertexIdInteger] = 0; } return new Tuple2(vertexDistanceVector,vertex.f1.f1); } } /** * Function that updates the value of a vertex by picking the minimum * distance from all incoming messages. * * @param */ public static final class VertexDistanceUpdater & Serializable, VV, M> extends VertexUpdateFunction, Integer> { private int srcVertexId; public VertexDistanceUpdater(K srcVertexId) { this.srcVertexId = Integer.valueOf(((Integer) srcVertexId).intValue()); } @Override public void updateVertex(Vertex> vertex, MessageIterator inMessages) { Integer[] vertexDistanceVector = vertex.getValue().f0; //int vertexKeyInteger = Integer.valueOf(((Integer) vertexKey).intValue()); Integer minDistance = 2000000000; //Integer minDistance = Integer.MAX_VALUE; // 2,147,483,647 => type overflow for (Integer msg : inMessages) { // System.out.println("DEBUG updateVertex: vertexKey: " + vertexKey + " vertexValue: " + vertexValue // + " inMessage: " + msg); if (msg < minDistance) { minDistance = msg; } } if (vertexDistanceVector[srcVertexId] > minDistance) { vertexDistanceVector[srcVertexId] = minDistance; setNewVertexValue(new Tuple2(vertexDistanceVector, vertex.getValue().f1)); } } } /** * Distributes the minimum distance associated with a given vertex among all * the target vertices summed up with the edge's value. * * @param */ public static final class MinDistanceMessenger & Serializable, VV, M, EV> extends MessagingFunction, Integer, NullValue> { private int srcVertexId; public MinDistanceMessenger(K srcVertexId) { this.srcVertexId = Integer.valueOf(((Integer) srcVertexId).intValue()); } @Override public void sendMessages(Vertex> vertex) throws Exception { Integer[] vertexDistanceVector = vertex.getValue().f0; //int vertexKeyInteger = Integer.valueOf(((Integer) vertexKey).intValue()); // System.out.println("DEBUG vertexKeyInteger = " + vertexKeyInteger); // System.out.println("DEBUG vertexDistanceVector.length = " + vertexDistanceVector.length); Integer distanceToPropagate = vertexDistanceVector[srcVertexId]; for (Edge edge : getEdges()) { // System.out.println("DEBUG sendMessage from: " + edge.getSource() + // " to: " + edge.getTarget() + " with distance: " + (distanceToPropagate + 1L)); sendMessageTo(edge.getTarget(), (distanceToPropagate + 1)); // NullValue => 1L (Integer) on edges } } } }