package graphdistance; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.ProgramDescription; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.types.NullValue; public class APSPNaiveJob implements ProgramDescription { public static void main(String[] args) throws Exception { if (!parseParameters(args)) { return; } JobExecutionResult intermediateResult; long jobRuntime = 0L; long startTime = System.currentTimeMillis(); ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //env.setParallelism(1); DataSet >> vertices = getVerticesDataSet(env); DataSet >> intermediateVertices = getIntermediateVerticesDataSet(env); DataSet> edges = getEdgesDataSet(env); Graph, NullValue> graph = Graph.fromDataSet(vertices, edges, env); // ************************************************************************************************** // APSP COMPUTATION // ************************************************************************************************** // first run of SSSP; write intermediate result (graph vertices) System.out.println("!!! Executing SSSP for srcVertexId = " + srcVertexId); Graph, NullValue> intermediateGraph = graph.run(new APSP(srcVertexId, maxIterations)); System.out.println("DEBUG intermediateGraph.numberOfVertices() = " + intermediateGraph.numberOfVertices() + " SSSP iteration = " + srcVertexId); intermediateGraph.getVertices().writeAsCsv(tempGraphOutputPath, "\n", ";", WriteMode.OVERWRITE); intermediateResult = env.execute("KAPSPNaive"); jobRuntime += intermediateResult.getNetRuntime(); srcVertexId++; // run SSSP n-1 times while (srcVertexId < numOfVertices) { System.out.println("!!! Executing SSSP for srcVertexId = " + srcVertexId); Graph, NullValue> intermediateGraph2 = Graph.fromDataSet(intermediateVertices, edges, env); System.out.println("DEBUG intermediateGraph2.numberOfVertices() b4 comp = " + intermediateGraph2.numberOfVertices() + " SSSP iteration = " + srcVertexId); intermediateGraph2 = intermediateGraph2.run(new APSP(srcVertexId, maxIterations)); System.out.println("DEBUG intermediateGraph2.numberOfVertices() after comp = " + intermediateGraph2.numberOfVertices() + " SSSP iteration = " + srcVertexId); // TODO problem with overwriting tempGraphOutputPath !!! intermediateGraph2.getVertices().writeAsCsv(tempGraphOutputPath, "\n", ";", WriteMode.OVERWRITE); intermediateResult = env.execute("KAPSPNaive"); jobRuntime += intermediateResult.getNetRuntime(); srcVertexId++; } //////////////// //System.out.println(env.getExecutionPlan()); long endTime = System.currentTimeMillis(); System.out.println("APSPNaive JOB stats --------------------------------------------------"); System.out.println("APSPNaive JOB net runtime [Flink] = " + jobRuntime/1000); System.out.println("APSPNaive JOB total runtime [System] = " + ((endTime-startTime)/1000) ); System.out.println("APSPNaive JOB no. input vertices = " + numOfVertices); } @Override public String getDescription() { return "APSPNaive Job"; } // ****************************************************************************************************************** // UTIL METHODS APSP // ****************************************************************************************************************** private static boolean fileOutput = false; private static Integer srcVertexId = null; private static int numOfVertices = 0; private static String verticesInputPath = null; private static String edgesInputPath = null; private static String tempGraphOutputPath = null; private static int maxIterations = 10; @SuppressWarnings("unused") private static double textSimThreshold = 0d; @SuppressWarnings("unused") private static String finalResultPath = null; // graph distance threshold for k-APSP @SuppressWarnings("unused") private static int grapDistThreshold = 0; private static boolean parseParameters(String[] args) { if (args.length > 0) { if (args.length == 9) { fileOutput = true; srcVertexId = Integer.parseInt(args[0]); numOfVertices = Integer.parseInt(args[1]); verticesInputPath = args[2]; edgesInputPath = args[3]; tempGraphOutputPath = args[4]; maxIterations = Integer.parseInt(args[5]); textSimThreshold = Double.parseDouble(args[6]); finalResultPath = args[7]; grapDistThreshold = Integer.parseInt(args[8]); } else { System.err.println("Usage: APSPNaive " + " " + " "); return false; } } return true; } //read intermediate vertex data set; written by explicit persistence between iterations private static DataSet>> getIntermediateVerticesDataSet(ExecutionEnvironment env) { if (fileOutput) { return env.readCsvFile(tempGraphOutputPath) .fieldDelimiter(";") .lineDelimiter("\n") .types(Integer.class, String.class) .map(new IntermediateMapper(numOfVertices)); } else return null; } @SuppressWarnings("serial") private static final class IntermediateMapper implements MapFunction , Vertex>> { private int numVertices; public IntermediateMapper(int numOfVertices) { this.numVertices = numOfVertices; } @Override public Vertex> map(Tuple2 tuple2) throws Exception { // id;([id, id, id],tokenList) //Integer intermediateVertexId = tuple2.f0; String intermediateVertexValue = tuple2.f1; // remove parantheses from vertex value String intermediateVertexValueNoParantheses = intermediateVertexValue.substring(1, intermediateVertexValue.length()-1); //System.out.println("DEBUG intermediateVertexValueNoParantheses: " + intermediateVertexValueNoParantheses); // [id, id, id],tokenList String[] tempVertexValueParts = intermediateVertexValueNoParantheses.split("],"); String tempVertexDistVect = tempVertexValueParts[0].substring(1,tempVertexValueParts[0].length()); // remove starting [ String tempVertexTokenList = tempVertexValueParts [1]; //System.out.println("DEBUG tempVertexDistVect = " + tempVertexDistVect); //System.out.println("DEBUG tempVertexTokenList = " + tempVertexTokenList); String[] vertexDistanceVectorStr = tempVertexDistVect.split(", "); // distance vector: distances from all other vertices to current vector Integer[] vertexDistanceVector = new Integer[numVertices]; for (int i = 0; i< vertexDistanceVector.length; i++) { vertexDistanceVector[i]=Integer.parseInt(vertexDistanceVectorStr[i]); //System.out.println("DEBUG vertexDistanceVector["+ i +"] = " + vertexDistanceVector[i]); } return new Vertex>(tuple2.f0, new Tuple2(vertexDistanceVector,tempVertexTokenList)); } } private static DataSet>> getVerticesDataSet(ExecutionEnvironment env) { if (fileOutput) { return env.readCsvFile(verticesInputPath) .fieldDelimiter(" ") .lineDelimiter("\n") .types(Integer.class, String.class) .map(new InitializeMapper(numOfVertices)); } else { System.err.println("Usage: APSPNaive " + " " + " "); return APSPData.getDefaultVertexDataSet(env); } } @SuppressWarnings("serial") private static final class InitializeMapper implements MapFunction , Vertex>> { private int numVertices; public InitializeMapper(int numOfVertices) { this.numVertices = numOfVertices; } @Override public Vertex> map(Tuple2 tuple2) throws Exception { // distance vector: distances from all other vertices to current vector Integer[] vertexDistanceVector = new Integer[numVertices]; for (int i = 0; i< vertexDistanceVector.length; i++) vertexDistanceVector[i]=666; return new Vertex>(tuple2.f0, new Tuple2( vertexDistanceVector,tuple2.f1)); } } @SuppressWarnings("serial") private static DataSet> getEdgesDataSet(ExecutionEnvironment env) { if (fileOutput) { return env.readCsvFile(edgesInputPath) .fieldDelimiter(" ") .lineDelimiter("\n") .types(Integer.class, Integer.class) .map(new MapFunction, Edge>() { @Override public Edge map(Tuple2 tuple2) throws Exception { return new Edge(tuple2.f0, tuple2.f1, NullValue.getInstance()); } }); } else { System.err.println("Usage: APSPNaive " + " " + " "); return APSPData.getDefaultEdgeDataSet(env); } } ////////////////////////////////////////////////////////////////// }