Hi,
I have an issue with a for-loop. If I set the maximal iteration number i to more than 3 it gets stuck and I cannot figure out why. With 1, 2 or 3 it runs smoothly. I attached the code below and marked the loop with //PROBLEM. Thanks in advance! Lydia package org.apache.flink.contrib.lifescience.examples; |
Hi Lydia,
To build iterative algorithm on Flink, using API for iterations [1] would be better than using for-loop. Your program triggers multiple executions by multiple calling `next.gap.print()`. In each execution, Flink reads whole data redundantly and it cause performance to decrease. Regards, Chiwan Park [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/iterations.html > On Mar 27, 2016, at 7:16 AM, Lydia Ickler <[hidden email]> wrote: > > Hi, > > I have an issue with a for-loop. > If I set the maximal iteration number i to more than 3 it gets stuck and I cannot figure out why. > With 1, 2 or 3 it runs smoothly. > I attached the code below and marked the loop with //PROBLEM. > > Thanks in advance! > Lydia > > package org.apache.flink.contrib.lifescience.examples; > > import edu.princeton.cs.algs4.Graph; > import edu.princeton.cs.algs4.SymbolDigraph; > import org.apache.flink.api.common.functions.FilterFunction; > import org.apache.flink.api.common.functions.FlatJoinFunction; > import org.apache.flink.api.common.functions.MapFunction; > import org.apache.flink.api.java.DataSet; > import org.apache.flink.api.java.ExecutionEnvironment; > import org.apache.flink.api.java.aggregation.Aggregations; > import org.apache.flink.api.java.io.CsvReader; > import org.apache.flink.api.java.operators.DataSource; > import org.apache.flink.api.java.operators.IterativeDataSet; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.api.java.tuple.Tuple3; > import org.apache.flink.contrib.lifescience.networks.algos.DataSetUtils; > import org.apache.flink.contrib.lifescience.networks.datatypes.networks.Network; > import org.apache.flink.contrib.lifescience.networks.datatypes.networks.NetworkEdge; > import org.apache.flink.contrib.lifescience.networks.datatypes.networks.NetworkNode; > import org.apache.flink.core.fs.FileSystem; > import org.apache.flink.util.Collector; > > import java.util.*; > > import static edu.princeton.cs.algs4.GraphGenerator.simple; > > public class PowerIteration { > > //path to input > static String input = null; > //path to output > static String output = null; > //number of iterations (default = 7) > static int iterations = 7; > //threshold > static double delta = 0.01; > > public void run() throws Exception { > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > > //read input file > DataSet<Tuple3<Integer, Integer, Double>> matrixA = readMatrix(env, input); > > DataSet<Tuple3<Integer, Integer, Double>> eigenVector; > DataSet<Tuple3<Integer, Integer, Double>> eigenValue; > > //initial: > //Approximate EigenVector by PowerIteration > eigenVector = PowerIteration_getEigenVector(matrixA); > //Approximate EigenValue by PowerIteration > eigenValue = PowerIteration_getEigenValue(matrixA,eigenVector); > //Deflate original matrix > matrixA = PowerIteration_getNextMatrix(matrixA,eigenVector,eigenValue); > > MyResult initial = new MyResult(eigenVector,eigenValue,matrixA); > > MyResult next = null; > > //PROBLEM!!! get i eigenvalue gaps > for(int i=0;i<2;i++){ > next = PowerIteration_routine(initial); > initial = next; > next.gap.print(); > } > > env.execute("Power Iteration"); > } > > public static DataSource<Tuple3<Integer, Integer, Double>> readMatrix(ExecutionEnvironment env, > String filePath) { > CsvReader csvReader = env.readCsvFile(filePath); > csvReader.fieldDelimiter(","); > csvReader.includeFields("ttt"); > return csvReader.types(Integer.class, Integer.class, Double.class); > } > > public static final class ProjectJoinResultMapper implements > MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, > Tuple3<Integer, Integer, Double>>, > Tuple3<Integer, Integer, Double>> { > @Override > public Tuple3<Integer, Integer, Double> map( > Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value) > throws Exception { > Integer row = value.f0.f0; > Integer column = value.f1.f1; > Double product = value.f0.f2 * value.f1.f2; > return new Tuple3<Integer, Integer, Double>(row, column, product); > } > } > > public static final class RQ implements > MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>, > Tuple3<Integer, Integer, Double>> { > > @Override > public Tuple3<Integer, Integer, Double> map( > Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value) > throws Exception { > > return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,value.f0.f2/value.f1.f2); > } > } > > public static void main(String[] args) throws Exception { > if(args.length<2 || args.length > 4){ > System.err.println("Usage: PowerIteration <input path> <result path> optional: <iterations> <threshold diff>"); > System.exit(0); > } > > input = args[0]; > output = args[1]; > > if(args.length==3) { > iterations = Integer.parseInt(args[2]); > } > if(args.length==4){ > delta = Double.parseDouble(args[3]); > } > > new PowerIteration2().run(); > } > > public static final class deltaFilter implements FlatJoinFunction<Tuple3<Integer, Integer, Double>,Tuple3<Integer, Integer, Double>,Tuple3<Integer, Integer, Double>> { > > public void join(Tuple3<Integer, Integer, Double> candidate, Tuple3<Integer, Integer, Double> old, Collector<Tuple3<Integer, Integer, Double>> out) { > > if(!(candidate.f2 == old.f2)){ > out.collect(candidate); > } > > //if(Math.abs(candidate.f2-old.f2) > delta){ > // out.collect(candidate); > //} > > } > } > > public static final class normalizeByMax implements > MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>, > Tuple3<Integer, Integer, Double>> { > > public Tuple3<Integer, Integer, Double> map( > Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value) > throws Exception { > return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,value.f0.f2/(value.f1.f2)); > } > } > > public static final class firstX implements > MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>, > Tuple3<Integer, Integer, Double>> { > > public Tuple3<Integer, Integer, Double> map( > Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value) > throws Exception { > return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,1/(value.f0.f2*value.f1.f2)); > } > } > > public static final class resetIndex implements > MapFunction<Tuple3<Integer, Integer, Double>, > Tuple3<Integer, Integer, Double>> { > > public Tuple3<Integer, Integer, Double> map( > Tuple3<Integer, Integer, Double>value) > throws Exception { > return new Tuple3<Integer, Integer, Double>(0,value.f1,value.f2); > } > } > > > public static final class decBy1 implements > MapFunction<Tuple3<Integer, Integer, Double>, > Tuple3<Integer, Integer, Double>> { > > public Tuple3<Integer, Integer, Double> map( > Tuple3<Integer, Integer, Double>value) > throws Exception { > return new Tuple3<Integer, Integer, Double>(value.f0-1,value.f1-1,value.f2); > } > } > > public static final class resetIndex2 implements > MapFunction<Tuple3<Integer, Integer, Double>, > Tuple3<Integer, Integer, Double>> { > > public Tuple3<Integer, Integer, Double> map( > Tuple3<Integer, Integer, Double>value) > throws Exception { > return new Tuple3<Integer, Integer, Double>(value.f0,0,value.f2); > } > } > > public static final class MatrixTimesValue implements > MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>, > Tuple3<Integer, Integer, Double>> { > > @Override > public Tuple3<Integer, Integer, Double> map( > Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value) > throws Exception { > > return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,value.f0.f2*(value.f1.f2)); > } > } > > public static final class MatrixMinusMatrix implements > MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, > Tuple3<Integer, Integer, Double>>, > Tuple3<Integer, Integer, Double>> { > @Override > public Tuple3<Integer, Integer, Double> map( > Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value) > throws Exception { > Integer row = value.f0.f0; > Integer column = value.f0.f1; > Double result = value.f0.f2 - value.f1.f2; > return new Tuple3<Integer, Integer, Double>(row, column, result); > } > } > > public static final class getGapCenter implements > MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, > Tuple3<Integer, Integer, Double>>, > Tuple3<Integer, Integer, Double>> { > @Override > public Tuple3<Integer, Integer, Double> map( > Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value) > throws Exception { > Integer row = value.f0.f0; > Integer column = value.f0.f1; > Double result = value.f0.f2 + (2/(Math.abs(value.f1.f2))); > return new Tuple3<Integer, Integer, Double>(row, column, result); > } > } > > > public static DataSet<Tuple3<Integer, Integer, Double>> PowerIteration_getEigenVector(DataSet<Tuple3<Integer, Integer, Double>> matrixA) throws Exception { > > //get initial vector - which equals matrixA * [1, ... , 1] > DataSet<Tuple3<Integer, Integer, Double>> initial0 = matrixA.groupBy(0).aggregate(Aggregations.SUM,2); > > //normalize by maximum value > DataSet<Tuple3<Integer, Integer, Double>> initial= initial0.cross(initial0.maxBy(2)).map(new normalizeByMax()); > > //BulkIteration to find dominant eigenvector > IterativeDataSet<Tuple3<Integer, Integer, Double>> iteration = initial.iterate(iterations); > > DataSet<Tuple3<Integer, Integer, Double>> intermediate = (matrixA.join(iteration).where(1).equalTo(0) > .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2)).groupBy(0).aggregate(Aggregations.SUM, 2). > cross((matrixA.join(iteration).where(1).equalTo(0) > .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2)).groupBy(0).aggregate(Aggregations.SUM, 2).maxBy(2)) > .map(new normalizeByMax()); > > DataSet<Tuple3<Integer, Integer, Double>> diffs = iteration.join(intermediate).where(0).equalTo(0).with(new deltaFilter()); > DataSet<Tuple3<Integer, Integer, Double>> eigenVector = iteration.closeWith(intermediate,diffs); > > return eigenVector; > } > > public static DataSet<Tuple3<Integer, Integer, Double>> PowerIteration_getEigenValue(DataSet<Tuple3<Integer, Integer, Double>> matrixA, DataSet<Tuple3<Integer, Integer, Double>> eigenVector) { > > //determine now EigenValue by approximating the Rayleigh Quotient: > //get Ax > DataSet<Tuple3<Integer, Integer, Double>> Ax = matrixA.join(eigenVector).where(1).equalTo(0) > .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2).groupBy(0).aggregate(Aggregations.SUM, 2); > //get Ax * x > DataSet<Tuple3<Integer, Integer, Double>> Axx = eigenVector.join(Ax).where(0).equalTo(0) > .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2).aggregate(Aggregations.SUM,2); > > //now x * x > DataSet<Tuple3<Integer, Integer, Double>> xx = eigenVector.join(eigenVector).where(0).equalTo(0) > .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2).aggregate(Aggregations.SUM,2); > > return Axx.cross(xx).map(new RQ()).aggregate(Aggregations.SUM, 2); > } > > public static DataSet<Tuple3<Integer, Integer, Double>> PowerIteration_getNextMatrix(DataSet<Tuple3<Integer, Integer, Double>> matrixA, DataSet<Tuple3<Integer, Integer, Double>> eigenVector, DataSet<Tuple3<Integer, Integer, Double>> eigenValue) { > > DataSet<Tuple3<Integer, Integer, Double>> eigenValueReset = eigenValue.map(new resetIndex()); > DataSet<Tuple3<Integer, Integer, Double>> firstVal = eigenVector.filter(new FilterFunction<Tuple3<Integer, Integer, Double>>() { > public boolean filter(Tuple3<Integer, Integer, Double> value) { > return value.f0 == 0; > } > }); > firstVal = eigenValueReset.cross(firstVal).map(new firstX()); > DataSet<Tuple3<Integer, Integer, Double>> firstRow = matrixA.filter(new FilterFunction<Tuple3<Integer, Integer, Double>>() { > public boolean filter(Tuple3<Integer, Integer, Double> value) { > return value.f0 == 0; > } > }); > DataSet<Tuple3<Integer, Integer, Double>> x = ((firstRow.map(new DataSetUtils.transpose())).join(firstVal).where(1).equalTo(0).map(new MatrixTimesValue())).map(new DataSetUtils.transpose()); > DataSet<Tuple3<Integer, Integer, Double>> C = eigenVector.cross(eigenValueReset).map(new MatrixTimesValue()).map(new resetIndex2()).join(x).where(1).equalTo(0). > map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2); > matrixA = matrixA.join(C).where(0,1).equalTo(0,1).map(new MatrixMinusMatrix()); > matrixA = matrixA.filter(new FilterFunction<Tuple3<Integer, Integer, Double>>() { > public boolean filter(Tuple3<Integer, Integer, Double> value) { > return (value.f0 != 0) && (value.f1 != 0); > } > }); > > return matrixA.map(new decBy1()); > } > > public MyResult PowerIteration_routine(MyResult initial) throws Exception { > > //Approximate EigenVector by PowerIteration > DataSet<Tuple3<Integer, Integer, Double>> eigenVector = PowerIteration_getEigenVector(initial.matrixA); > //Approximate EigenValue by PowerIteration > DataSet<Tuple3<Integer, Integer, Double>> eigenValue = PowerIteration_getEigenValue(initial.matrixA, eigenVector); > //get gap > DataSet<Tuple3<Integer, Integer, Double>> gap = initial.eigenValue.cross(eigenValue).map(new MatrixMinusMatrix()); > //Deflate original matrix > DataSet<Tuple3<Integer, Integer, Double>> matrixA = PowerIteration_getNextMatrix(initial.matrixA,eigenVector,eigenValue); > > return new MyResult(eigenVector,eigenValue,matrixA,gap); > } > > public class MyResult { > DataSet<Tuple3<Integer, Integer, Double>> eigenVector; > DataSet<Tuple3<Integer, Integer, Double>> eigenValue; > DataSet<Tuple3<Integer, Integer, Double>> gap; > DataSet<Tuple3<Integer, Integer, Double>> matrixA; > > public MyResult(DataSet<Tuple3<Integer, Integer, Double>> eigenVector, DataSet<Tuple3<Integer, Integer, Double>> eigenValue,DataSet<Tuple3<Integer, Integer, Double>> matrixA){ > this.eigenVector = eigenVector; > this.eigenValue =eigenValue; > this.matrixA = matrixA; > } > > public MyResult(DataSet<Tuple3<Integer, Integer, Double>> eigenVector, DataSet<Tuple3<Integer, Integer, Double>> eigenValue,DataSet<Tuple3<Integer, Integer, Double>> matrixA, DataSet<Tuple3<Integer, Integer, Double>> gap){ > this.eigenVector = eigenVector; > this.eigenValue =eigenValue; > this.matrixA = matrixA; > this.gap = gap; > } > } > > } > |
Free forum by Nabble | Edit this page |