package de.tuberlin.dima.aim3.assignment3; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.CsvReader; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.DataSet; public class PowerIteration { //path to input static String input = null; //path to output static String output = null; //number of iterations (default = 7) static int iterations = 1; //threshold static double d = 0.01; public void run() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //read input file DataSet> matrixA = readMatrix(env, input); /**************** POWER ITERATION *****************/ //get initial vector - which equals matrixA * [1, ... , 1] DataSet> initial = matrixA.groupBy(0).sum(2); //normalize by maximum value initial = initial.cross(initial.max(2)).map(new normalizeByMax()); initial.writeAsCsv(output+"_initial"); DataSet> newIteration; //do iterations to determine dominant EigenVector (scaled) - STOP of difference < d for(int i = 0; i < iterations;i++){ newIteration = matrixA.join(initial).where(1).equalTo(0) .map(new ProjectJoinResultMapper()).groupBy(0, 1).sum(2); //normalize by maximal value newIteration = newIteration.groupBy(0).sum(2).cross(newIteration.groupBy(0).sum(2).max(2)).map(new normalizeByMax()); if(initial.equals(newIteration)){ break; } initial = newIteration; } newIteration = initial; newIteration.writeAsCsv(output+"_EigenVector"); //determine now EigenValue by approximating the Rayleigh Quotient: //get Ax DataSet> Ax = matrixA.join(newIteration).where(1).equalTo(0) .map(new ProjectJoinResultMapper()).groupBy(0, 1).sum(2).groupBy(0).sum(2); //get Ax * x DataSet> Axx = newIteration.join(Ax).where(0).equalTo(0) .map(new ProjectJoinResultMapper()).groupBy(0, 1).sum(2).groupBy(1).sum(2); //now x * x DataSet> xx = newIteration.join(newIteration).where(0).equalTo(0) .map(new ProjectJoinResultMapper()).groupBy(0, 1).sum(2).groupBy(1).sum(2); DataSet> RQ = Axx.cross(xx).map(new RQ()); RQ.groupBy(1).sum(2).writeAsCsv(output + "_EigenValue"); env.execute("Power Iteration"); } public static DataSource> readMatrix(ExecutionEnvironment env, String filePath) { CsvReader csvReader = env.readCsvFile(filePath); csvReader.fieldDelimiter(','); csvReader.includeFields("fttt"); return csvReader.types(Integer.class, Integer.class, Double.class); } public static final class ProjectJoinResultMapper implements MapFunction, Tuple3>, Tuple3> { @Override public Tuple3 map( Tuple2, Tuple3> value) throws Exception { Integer row = value.f0.f0; Integer column = value.f1.f1; Double product = value.f0.f2 * value.f1.f2; return new Tuple3(row, column, product); } } public static final class normalizeByMax implements MapFunction, Tuple3>, Tuple3> { @Override public Tuple3 map( Tuple2, Tuple3> value) throws Exception { return new Tuple3(value.f0.f0,value.f0.f1,value.f0.f2/(value.f1.f2)); } } public static final class RQ implements MapFunction, Tuple3>, Tuple3> { @Override public Tuple3 map( Tuple2, Tuple3> value) throws Exception { return new Tuple3(value.f0.f0,value.f0.f1,value.f0.f2/value.f1.f2); } } public static void main(String[] args) throws Exception { if(args.length<2 || args.length > 4){ System.err.println("Usage: PowerIteration optional: * *"); System.exit(0); } input = args[0]; output = args[1]; if(args.length==3) { iterations = Integer.parseInt(args[2]); } if(args.length==4){ d = Double.parseDouble(args[3]); } new PowerIteration().run(); } }