/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.graph.example; import org.apache.flink.api.common.ProgramDescription; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.example.utils.SingleSourceShortestPathsDataUnweighted; import org.apache.flink.graph.library.SingleSourceShortestPathsUnweighted; import org.apache.flink.types.NullValue; public class SingleSourceShortestPathsExampleUnweighted implements ProgramDescription { public static void main(String[] args) throws Exception { if (!parseParameters(args)) { return; } //Long tempLong = Long.MAX_VALUE; //System.out.println("DEBUG Long.MAX_VALUE: " + tempLong); System.out.println("DEBUG srcVertexId: " + srcVertexId); ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet> vertices = getVerticesDataSet(env); DataSet> edges = getEdgesDataSet(env); Graph graph = Graph.fromDataSet(vertices, edges, env); DataSet> singleSourceShortestPathsUnweighted = graph .run(new SingleSourceShortestPathsUnweighted(srcVertexId, maxIterations)).getVertices(); // emit result if (fileOutput) { singleSourceShortestPathsUnweighted.writeAsCsv(outputPath, "\n", " "); } else { singleSourceShortestPathsUnweighted.print(); } env.execute("Single Source Shortest Paths Example"); } @Override public String getDescription() { return "Single Source Shortest Paths"; } // ****************************************************************************************************************** // UTIL METHODS // ****************************************************************************************************************** private static boolean fileOutput = false; private static Long srcVertexId = null; private static String verticesInputPath = null; private static String edgesInputPath = null; private static String outputPath = null; private static int maxIterations = 5; private static boolean parseParameters(String[] args) { if (args.length > 0) { if (args.length == 5) { fileOutput = true; srcVertexId = Long.parseLong(args[0]); verticesInputPath = args[1]; edgesInputPath = args[2]; outputPath = args[3]; maxIterations = Integer.parseInt(args[4]); } else { System.err.println("Usage: SingleSourceShortestPaths " + " "); return false; } } return true; } @SuppressWarnings("serial") private static DataSet> getVerticesDataSet(ExecutionEnvironment env) { if (fileOutput) { return env.readCsvFile(verticesInputPath) .fieldDelimiter(" ") .lineDelimiter("\n") .types(Long.class, Long.class) .map(new MapFunction, Vertex>() { @Override public Vertex map(Tuple2 tuple2) throws Exception { return new Vertex(tuple2.f0, tuple2.f1); } }); } else { System.err.println("Usage: SingleSourceShortestPathsUnweighted " + " "); return SingleSourceShortestPathsDataUnweighted.getDefaultVertexDataSet(env); } } @SuppressWarnings("serial") private static DataSet> getEdgesDataSet(ExecutionEnvironment env) { if (fileOutput) { return env.readCsvFile(edgesInputPath) .fieldDelimiter(" ") .lineDelimiter("\n") .types(Long.class, Long.class) .map(new MapFunction, Edge>() { @Override public Edge map(Tuple2 tuple2) throws Exception { return new Edge(tuple2.f0, tuple2.f1, NullValue.getInstance()); } }); } else { System.err.println("Usage: SingleSourceShortestPathsUnweighted " + " "); return SingleSourceShortestPathsDataUnweighted.getDefaultEdgeDataSet(env); } } }