package graphdistance; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.spargel.IterationConfiguration; import org.apache.flink.graph.spargel.MessageIterator; import org.apache.flink.graph.spargel.MessagingFunction; import org.apache.flink.graph.spargel.VertexUpdateFunction; import org.apache.flink.types.NullValue; import java.io.Serializable; @SuppressWarnings("serial") public class APSP & Serializable> implements GraphAlgorithm, NullValue> { private final K srcVertexId; private final Integer maxIterations; public APSP(K srcVertexId, Integer maxIterations) { this.srcVertexId = srcVertexId; this.maxIterations = maxIterations; } @Override public Graph, NullValue> run(Graph, NullValue> input) { // VertexCentricIteration configuration: UNMANAGED MEMORY because of FLINK-1916 bug IterationConfiguration parameters = new IterationConfiguration(); parameters.setSolutionSetUnmanagedMemory(true); return input.mapVertices(new InitVerticesMapper(srcVertexId)) .runVertexCentricIteration(new VertexDistanceUpdater, Long>(srcVertexId), new MinDistanceMessenger, Long, NullValue>(srcVertexId), maxIterations, parameters); // return input.mapVertices(new InitVerticesMapper(srcVertexId)) // .runVertexCentricIteration(new VertexDistanceUpdater, Long>(), // new MinDistanceMessenger, Long, NullValue>(), // maxIterations); } public static final class InitVerticesMapper & Serializable> implements MapFunction>, Tuple2> { private K srcVertexId; private int srcVertexIdInteger; public InitVerticesMapper(K srcId) { this.srcVertexId = srcId; this.srcVertexIdInteger = Integer.valueOf(((Long) srcVertexId).intValue()); } public Tuple2 map(Vertex> value) { Long[] vertexDistanceVector = value.f1.f0; vertexDistanceVector [srcVertexIdInteger] = 999999999999L; // 9223372036854775807L => type overflow if (value.f0.equals(srcVertexId)) { //System.out.println("DEBUG setting vertexDistanceVector[ " + srcVertexIdInteger + " ] = 0L \n"); vertexDistanceVector[srcVertexIdInteger] = 0L; } return new Tuple2(vertexDistanceVector,value.f1.f1); // if (value.f0.equals(srcVertexId)) { // return new Tuple2(0L, value.f1.f1); // } else { // return new Tuple2(999999999999L, value.f1.f1); // //return Long.MAX_VALUE; // 9223372036854775807L => type overflow // } } } /** * Function that updates the value of a vertex by picking the minimum * distance from all incoming messages. * * @param */ public static final class VertexDistanceUpdater & Serializable, VV, M> extends VertexUpdateFunction, Long> { private int srcVertexId; public VertexDistanceUpdater(K srcVertexId) { this.srcVertexId = Integer.valueOf(((Long) srcVertexId).intValue()); } @Override public void updateVertex(K vertexKey, Tuple2 vertexValue, MessageIterator inMessages) { Long[] vertexDistanceVector = vertexValue.f0; //int vertexKeyInteger = Integer.valueOf(((Long) vertexKey).intValue()); Long minDistance = 999999999999L; //Long minDistance = Long.MAX_VALUE; // 9223372036854775807L => type overflow for (Long msg : inMessages) { // System.out.println("DEBUG updateVertex: vertexKey: " + vertexKey + " vertexValue: " + vertexValue // + " inMessage: " + msg); if (msg < minDistance) { minDistance = msg; } } if (vertexDistanceVector[srcVertexId] > minDistance) { vertexDistanceVector[srcVertexId] = minDistance; setNewVertexValue(new Tuple2(vertexDistanceVector, vertexValue.f1)); } } } /** * Distributes the minimum distance associated with a given vertex among all * the target vertices summed up with the edge's value. * * @param */ public static final class MinDistanceMessenger & Serializable, VV, M, EV> extends MessagingFunction, Long, NullValue> { private int srcVertexId; public MinDistanceMessenger(K srcVertexId) { this.srcVertexId = Integer.valueOf(((Long) srcVertexId).intValue()); } @Override public void sendMessages(K vertexKey, Tuple2 vertexValue) throws Exception { Long[] vertexDistanceVector = vertexValue.f0; //int vertexKeyInteger = Integer.valueOf(((Long) vertexKey).intValue()); // System.out.println("DEBUG vertexKeyInteger = " + vertexKeyInteger); // System.out.println("DEBUG vertexDistanceVector.length = " + vertexDistanceVector.length); Long distanceToPropagate = vertexDistanceVector[srcVertexId]; for (Edge edge : getOutgoingEdges()) { // System.out.println("DEBUG sendMessage from: " + edge.getSource() + // " to: " + edge.getTarget() + " with distance: " + (distanceToPropagate + 1L)); sendMessageTo(edge.getTarget(), (distanceToPropagate + 1L)); // NullValue => 1L (Long) on edges } } } }