for loop slow

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

for loop slow

Lydia Ickler
Hi,

I have an issue with a for-loop.
If I set the maximal iteration number i to more than 3 it gets stuck and I cannot figure out why.
With 1, 2 or 3 it runs smoothly.
I attached the code below and marked the loop with //PROBLEM.

Thanks in advance!
Lydia

package org.apache.flink.contrib.lifescience.examples;

import edu.princeton.cs.algs4.Graph;
import edu.princeton.cs.algs4.SymbolDigraph;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.io.CsvReader;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.contrib.lifescience.networks.algos.DataSetUtils;
import org.apache.flink.contrib.lifescience.networks.datatypes.networks.Network;
import org.apache.flink.contrib.lifescience.networks.datatypes.networks.NetworkEdge;
import org.apache.flink.contrib.lifescience.networks.datatypes.networks.NetworkNode;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.Collector;

import java.util.*;

import static edu.princeton.cs.algs4.GraphGenerator.simple;

public class PowerIteration {

//path to input
static String input = null;
//path to output
static String output = null;
//number of iterations (default = 7)
static int iterations = 7;
//threshold
static double delta = 0.01;

public void run() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

//read input file
DataSet<Tuple3<Integer, Integer, Double>> matrixA = readMatrix(env, input);

DataSet<Tuple3<Integer, Integer, Double>> eigenVector;
DataSet<Tuple3<Integer, Integer, Double>> eigenValue;

//initial:
//Approximate EigenVector by PowerIteration
eigenVector = PowerIteration_getEigenVector(matrixA);
//Approximate EigenValue by PowerIteration
eigenValue = PowerIteration_getEigenValue(matrixA,eigenVector);
//Deflate original matrix
matrixA = PowerIteration_getNextMatrix(matrixA,eigenVector,eigenValue);

MyResult initial = new MyResult(eigenVector,eigenValue,matrixA);

MyResult next = null;

//PROBLEM!!! get i eigenvalue gaps
for(int i=0;i<2;i++){
next = PowerIteration_routine(initial);
initial = next;
next.gap.print();
}

env.execute("Power Iteration");
}

public static DataSource<Tuple3<Integer, Integer, Double>> readMatrix(ExecutionEnvironment env,
String filePath) {
CsvReader csvReader = env.readCsvFile(filePath);
csvReader.fieldDelimiter(",");
csvReader.includeFields("ttt");
return csvReader.types(Integer.class, Integer.class, Double.class);
}

public static final class ProjectJoinResultMapper implements
MapFunction<Tuple2<Tuple3<Integer, Integer, Double>,
Tuple3<Integer, Integer, Double>>,
Tuple3<Integer, Integer, Double>> {
@Override
public Tuple3<Integer, Integer, Double> map(
Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
throws Exception {
Integer row = value.f0.f0;
Integer column = value.f1.f1;
Double product = value.f0.f2 * value.f1.f2;
return new Tuple3<Integer, Integer, Double>(row, column, product);
}
}

public static final class RQ implements
MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>,
Tuple3<Integer, Integer, Double>> {

@Override
public Tuple3<Integer, Integer, Double> map(
Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
throws Exception {

return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,value.f0.f2/value.f1.f2);
}
}

public static void main(String[] args) throws Exception {
if(args.length<2 || args.length > 4){
System.err.println("Usage: PowerIteration <input path> <result path> optional: <iterations> <threshold diff>");
System.exit(0);
}

input = args[0];
output = args[1];

if(args.length==3) {
iterations = Integer.parseInt(args[2]);
}
if(args.length==4){
delta = Double.parseDouble(args[3]);
}

new PowerIteration2().run();
}

public static final class deltaFilter implements FlatJoinFunction<Tuple3<Integer, Integer, Double>,Tuple3<Integer, Integer, Double>,Tuple3<Integer, Integer, Double>> {

public void join(Tuple3<Integer, Integer, Double> candidate, Tuple3<Integer, Integer, Double> old, Collector<Tuple3<Integer, Integer, Double>> out) {

if(!(candidate.f2 == old.f2)){
out.collect(candidate);
}

//if(Math.abs(candidate.f2-old.f2) > delta){
// out.collect(candidate);
//}

}
}

public static final class normalizeByMax implements
MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>,
Tuple3<Integer, Integer, Double>> {

public Tuple3<Integer, Integer, Double> map(
Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
throws Exception {
return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,value.f0.f2/(value.f1.f2));
}
}

public static final class firstX implements
MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>,
Tuple3<Integer, Integer, Double>> {

public Tuple3<Integer, Integer, Double> map(
Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
throws Exception {
return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,1/(value.f0.f2*value.f1.f2));
}
}

public static final class resetIndex implements
MapFunction<Tuple3<Integer, Integer, Double>,
Tuple3<Integer, Integer, Double>> {

public Tuple3<Integer, Integer, Double> map(
Tuple3<Integer, Integer, Double>value)
throws Exception {
return new Tuple3<Integer, Integer, Double>(0,value.f1,value.f2);
}
}


public static final class decBy1 implements
MapFunction<Tuple3<Integer, Integer, Double>,
Tuple3<Integer, Integer, Double>> {

public Tuple3<Integer, Integer, Double> map(
Tuple3<Integer, Integer, Double>value)
throws Exception {
return new Tuple3<Integer, Integer, Double>(value.f0-1,value.f1-1,value.f2);
}
}

public static final class resetIndex2 implements
MapFunction<Tuple3<Integer, Integer, Double>,
Tuple3<Integer, Integer, Double>> {

public Tuple3<Integer, Integer, Double> map(
Tuple3<Integer, Integer, Double>value)
throws Exception {
return new Tuple3<Integer, Integer, Double>(value.f0,0,value.f2);
}
}

public static final class MatrixTimesValue implements
MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>,
Tuple3<Integer, Integer, Double>> {

@Override
public Tuple3<Integer, Integer, Double> map(
Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
throws Exception {

return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,value.f0.f2*(value.f1.f2));
}
}

public static final class MatrixMinusMatrix implements
MapFunction<Tuple2<Tuple3<Integer, Integer, Double>,
Tuple3<Integer, Integer, Double>>,
Tuple3<Integer, Integer, Double>> {
@Override
public Tuple3<Integer, Integer, Double> map(
Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
throws Exception {
Integer row = value.f0.f0;
Integer column = value.f0.f1;
Double result = value.f0.f2 - value.f1.f2;
return new Tuple3<Integer, Integer, Double>(row, column, result);
}
}

public static final class getGapCenter implements
MapFunction<Tuple2<Tuple3<Integer, Integer, Double>,
Tuple3<Integer, Integer, Double>>,
Tuple3<Integer, Integer, Double>> {
@Override
public Tuple3<Integer, Integer, Double> map(
Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
throws Exception {
Integer row = value.f0.f0;
Integer column = value.f0.f1;
Double result = value.f0.f2 + (2/(Math.abs(value.f1.f2)));
return new Tuple3<Integer, Integer, Double>(row, column, result);
}
}


public static DataSet<Tuple3<Integer, Integer, Double>> PowerIteration_getEigenVector(DataSet<Tuple3<Integer, Integer, Double>> matrixA) throws Exception {

//get initial vector - which equals matrixA * [1, ... , 1]
DataSet<Tuple3<Integer, Integer, Double>> initial0 = matrixA.groupBy(0).aggregate(Aggregations.SUM,2);

//normalize by maximum value
DataSet<Tuple3<Integer, Integer, Double>> initial= initial0.cross(initial0.maxBy(2)).map(new normalizeByMax());

//BulkIteration to find dominant eigenvector
IterativeDataSet<Tuple3<Integer, Integer, Double>> iteration = initial.iterate(iterations);

DataSet<Tuple3<Integer, Integer, Double>> intermediate = (matrixA.join(iteration).where(1).equalTo(0)
.map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2)).groupBy(0).aggregate(Aggregations.SUM, 2).
cross((matrixA.join(iteration).where(1).equalTo(0)
.map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2)).groupBy(0).aggregate(Aggregations.SUM, 2).maxBy(2))
.map(new normalizeByMax());

DataSet<Tuple3<Integer, Integer, Double>> diffs = iteration.join(intermediate).where(0).equalTo(0).with(new deltaFilter());
DataSet<Tuple3<Integer, Integer, Double>> eigenVector = iteration.closeWith(intermediate,diffs);

return eigenVector;
}

public static DataSet<Tuple3<Integer, Integer, Double>> PowerIteration_getEigenValue(DataSet<Tuple3<Integer, Integer, Double>> matrixA, DataSet<Tuple3<Integer, Integer, Double>> eigenVector) {

//determine now EigenValue by approximating the Rayleigh Quotient:
//get Ax
DataSet<Tuple3<Integer, Integer, Double>> Ax = matrixA.join(eigenVector).where(1).equalTo(0)
.map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2).groupBy(0).aggregate(Aggregations.SUM, 2);
//get Ax * x
DataSet<Tuple3<Integer, Integer, Double>> Axx = eigenVector.join(Ax).where(0).equalTo(0)
.map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2).aggregate(Aggregations.SUM,2);

//now x * x
DataSet<Tuple3<Integer, Integer, Double>> xx = eigenVector.join(eigenVector).where(0).equalTo(0)
.map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2).aggregate(Aggregations.SUM,2);

return Axx.cross(xx).map(new RQ()).aggregate(Aggregations.SUM, 2);
}

public static DataSet<Tuple3<Integer, Integer, Double>> PowerIteration_getNextMatrix(DataSet<Tuple3<Integer, Integer, Double>> matrixA, DataSet<Tuple3<Integer, Integer, Double>> eigenVector, DataSet<Tuple3<Integer, Integer, Double>> eigenValue) {

DataSet<Tuple3<Integer, Integer, Double>> eigenValueReset = eigenValue.map(new resetIndex());
DataSet<Tuple3<Integer, Integer, Double>> firstVal = eigenVector.filter(new FilterFunction<Tuple3<Integer, Integer, Double>>() {
public boolean filter(Tuple3<Integer, Integer, Double> value) {
return value.f0 == 0;
}
});
firstVal = eigenValueReset.cross(firstVal).map(new firstX());
DataSet<Tuple3<Integer, Integer, Double>> firstRow = matrixA.filter(new FilterFunction<Tuple3<Integer, Integer, Double>>() {
public boolean filter(Tuple3<Integer, Integer, Double> value) {
return value.f0 == 0;
}
});
DataSet<Tuple3<Integer, Integer, Double>> x = ((firstRow.map(new DataSetUtils.transpose())).join(firstVal).where(1).equalTo(0).map(new MatrixTimesValue())).map(new DataSetUtils.transpose());
DataSet<Tuple3<Integer, Integer, Double>> C = eigenVector.cross(eigenValueReset).map(new MatrixTimesValue()).map(new resetIndex2()).join(x).where(1).equalTo(0).
map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2);
matrixA = matrixA.join(C).where(0,1).equalTo(0,1).map(new MatrixMinusMatrix());
matrixA = matrixA.filter(new FilterFunction<Tuple3<Integer, Integer, Double>>() {
public boolean filter(Tuple3<Integer, Integer, Double> value) {
return (value.f0 != 0) && (value.f1 != 0);
}
});

return matrixA.map(new decBy1());
}

public MyResult PowerIteration_routine(MyResult initial) throws Exception {

//Approximate EigenVector by PowerIteration
DataSet<Tuple3<Integer, Integer, Double>> eigenVector = PowerIteration_getEigenVector(initial.matrixA);
//Approximate EigenValue by PowerIteration
DataSet<Tuple3<Integer, Integer, Double>> eigenValue = PowerIteration_getEigenValue(initial.matrixA, eigenVector);
//get gap
DataSet<Tuple3<Integer, Integer, Double>> gap = initial.eigenValue.cross(eigenValue).map(new MatrixMinusMatrix());
//Deflate original matrix
DataSet<Tuple3<Integer, Integer, Double>> matrixA = PowerIteration_getNextMatrix(initial.matrixA,eigenVector,eigenValue);

return new MyResult(eigenVector,eigenValue,matrixA,gap);
}

public class MyResult {
DataSet<Tuple3<Integer, Integer, Double>> eigenVector;
DataSet<Tuple3<Integer, Integer, Double>> eigenValue;
DataSet<Tuple3<Integer, Integer, Double>> gap;
DataSet<Tuple3<Integer, Integer, Double>> matrixA;

public MyResult(DataSet<Tuple3<Integer, Integer, Double>> eigenVector, DataSet<Tuple3<Integer, Integer, Double>> eigenValue,DataSet<Tuple3<Integer, Integer, Double>> matrixA){
this.eigenVector = eigenVector;
this.eigenValue =eigenValue;
this.matrixA = matrixA;
}

public MyResult(DataSet<Tuple3<Integer, Integer, Double>> eigenVector, DataSet<Tuple3<Integer, Integer, Double>> eigenValue,DataSet<Tuple3<Integer, Integer, Double>> matrixA, DataSet<Tuple3<Integer, Integer, Double>> gap){
this.eigenVector = eigenVector;
this.eigenValue =eigenValue;
this.matrixA = matrixA;
this.gap = gap;
}
}

}

Reply | Threaded
Open this post in threaded view
|

Re: for loop slow

Chiwan Park-2
Hi Lydia,

To build iterative algorithm on Flink, using API for iterations [1] would be better than using for-loop. Your program triggers multiple executions by multiple calling `next.gap.print()`. In each execution, Flink reads whole data redundantly and it cause performance to decrease.

Regards,
Chiwan Park

[1]: https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/iterations.html

> On Mar 27, 2016, at 7:16 AM, Lydia Ickler <[hidden email]> wrote:
>
> Hi,
>
> I have an issue with a for-loop.
> If I set the maximal iteration number i to more than 3 it gets stuck and I cannot figure out why.
> With 1, 2 or 3 it runs smoothly.
> I attached the code below and marked the loop with //PROBLEM.
>
> Thanks in advance!
> Lydia
>
> package org.apache.flink.contrib.lifescience.examples;
>
> import edu.princeton.cs.algs4.Graph;
> import edu.princeton.cs.algs4.SymbolDigraph;
> import org.apache.flink.api.common.functions.FilterFunction;
> import org.apache.flink.api.common.functions.FlatJoinFunction;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.aggregation.Aggregations;
> import org.apache.flink.api.java.io.CsvReader;
> import org.apache.flink.api.java.operators.DataSource;
> import org.apache.flink.api.java.operators.IterativeDataSet;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.tuple.Tuple3;
> import org.apache.flink.contrib.lifescience.networks.algos.DataSetUtils;
> import org.apache.flink.contrib.lifescience.networks.datatypes.networks.Network;
> import org.apache.flink.contrib.lifescience.networks.datatypes.networks.NetworkEdge;
> import org.apache.flink.contrib.lifescience.networks.datatypes.networks.NetworkNode;
> import org.apache.flink.core.fs.FileSystem;
> import org.apache.flink.util.Collector;
>
> import java.util.*;
>
> import static edu.princeton.cs.algs4.GraphGenerator.simple;
>
> public class PowerIteration {
>
>     //path to input
>     static String input = null;
>     //path to output
>     static String output = null;
>     //number of iterations (default = 7)
>     static int iterations = 7;
>     //threshold
>     static double delta = 0.01;
>
>     public void run() throws Exception {
>         ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>
>         //read input file
>         DataSet<Tuple3<Integer, Integer, Double>> matrixA = readMatrix(env, input);
>
>         DataSet<Tuple3<Integer, Integer, Double>> eigenVector;
>         DataSet<Tuple3<Integer, Integer, Double>> eigenValue;
>
>         //initial:
>         //Approximate EigenVector by PowerIteration
>         eigenVector = PowerIteration_getEigenVector(matrixA);
>         //Approximate EigenValue by PowerIteration
>         eigenValue = PowerIteration_getEigenValue(matrixA,eigenVector);
>         //Deflate original matrix
>         matrixA = PowerIteration_getNextMatrix(matrixA,eigenVector,eigenValue);
>
>         MyResult initial = new MyResult(eigenVector,eigenValue,matrixA);
>
>         MyResult next = null;
>
>         //PROBLEM!!! get i eigenvalue gaps
>         for(int i=0;i<2;i++){
>             next = PowerIteration_routine(initial);
>             initial = next;
>             next.gap.print();
>         }
>
>         env.execute("Power Iteration");
>     }
>
>     public static DataSource<Tuple3<Integer, Integer, Double>> readMatrix(ExecutionEnvironment env,
>                                                                           String filePath) {
>         CsvReader csvReader = env.readCsvFile(filePath);
>         csvReader.fieldDelimiter(",");
>         csvReader.includeFields("ttt");
>         return csvReader.types(Integer.class, Integer.class, Double.class);
>     }
>
>     public static final class ProjectJoinResultMapper implements
>             MapFunction<Tuple2<Tuple3<Integer, Integer, Double>,
>                     Tuple3<Integer, Integer, Double>>,
>                     Tuple3<Integer, Integer, Double>> {
>         @Override
>         public Tuple3<Integer, Integer, Double> map(
>                 Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
>                 throws Exception {
>             Integer row = value.f0.f0;
>             Integer column = value.f1.f1;
>             Double product = value.f0.f2 * value.f1.f2;
>             return new Tuple3<Integer, Integer, Double>(row, column, product);
>         }
>     }
>
>     public static final class RQ implements
>             MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>,
>                     Tuple3<Integer, Integer, Double>> {
>
>         @Override
>         public Tuple3<Integer, Integer, Double> map(
>                 Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
>                 throws Exception {
>
>             return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,value.f0.f2/value.f1.f2);
>         }
>     }
>
>     public static void main(String[] args) throws Exception {
>         if(args.length<2 || args.length > 4){
>             System.err.println("Usage: PowerIteration <input path> <result path> optional: <iterations> <threshold diff>");
>             System.exit(0);
>         }
>
>         input = args[0];
>         output = args[1];
>
>         if(args.length==3) {
>             iterations = Integer.parseInt(args[2]);
>         }
>         if(args.length==4){
>             delta = Double.parseDouble(args[3]);
>         }
>
>         new PowerIteration2().run();
>     }
>
>     public static final class deltaFilter implements FlatJoinFunction<Tuple3<Integer, Integer, Double>,Tuple3<Integer, Integer, Double>,Tuple3<Integer, Integer, Double>> {
>
>         public void join(Tuple3<Integer, Integer, Double> candidate, Tuple3<Integer, Integer, Double> old, Collector<Tuple3<Integer, Integer, Double>> out) {
>
>             if(!(candidate.f2 == old.f2)){
>                 out.collect(candidate);
>             }
>
>             //if(Math.abs(candidate.f2-old.f2) > delta){
>             //    out.collect(candidate);
>             //}
>
>         }
>     }
>
>     public static final class normalizeByMax implements
>             MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>,
>                     Tuple3<Integer, Integer, Double>> {
>
>         public Tuple3<Integer, Integer, Double> map(
>                 Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
>                 throws Exception {
>             return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,value.f0.f2/(value.f1.f2));
>         }
>     }
>
>     public static final class firstX implements
>             MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>,
>                     Tuple3<Integer, Integer, Double>> {
>
>         public Tuple3<Integer, Integer, Double> map(
>                 Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
>                 throws Exception {
>             return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,1/(value.f0.f2*value.f1.f2));
>         }
>     }
>
>     public static final class resetIndex implements
>             MapFunction<Tuple3<Integer, Integer, Double>,
>                     Tuple3<Integer, Integer, Double>> {
>
>         public Tuple3<Integer, Integer, Double> map(
>                 Tuple3<Integer, Integer, Double>value)
>                 throws Exception {
>             return new Tuple3<Integer, Integer, Double>(0,value.f1,value.f2);
>         }
>     }
>
>
>     public static final class decBy1 implements
>             MapFunction<Tuple3<Integer, Integer, Double>,
>                     Tuple3<Integer, Integer, Double>> {
>
>         public Tuple3<Integer, Integer, Double> map(
>                 Tuple3<Integer, Integer, Double>value)
>                 throws Exception {
>             return new Tuple3<Integer, Integer, Double>(value.f0-1,value.f1-1,value.f2);
>         }
>     }
>
>     public static final class resetIndex2 implements
>             MapFunction<Tuple3<Integer, Integer, Double>,
>                     Tuple3<Integer, Integer, Double>> {
>
>         public Tuple3<Integer, Integer, Double> map(
>                 Tuple3<Integer, Integer, Double>value)
>                 throws Exception {
>             return new Tuple3<Integer, Integer, Double>(value.f0,0,value.f2);
>         }
>     }
>
>     public static final class MatrixTimesValue implements
>             MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>,
>                     Tuple3<Integer, Integer, Double>> {
>
>         @Override
>         public Tuple3<Integer, Integer, Double> map(
>                 Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
>                 throws Exception {
>
>             return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,value.f0.f2*(value.f1.f2));
>         }
>     }
>
>     public static final class MatrixMinusMatrix implements
>             MapFunction<Tuple2<Tuple3<Integer, Integer, Double>,
>                     Tuple3<Integer, Integer, Double>>,
>                     Tuple3<Integer, Integer, Double>> {
>         @Override
>         public Tuple3<Integer, Integer, Double> map(
>                 Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
>                 throws Exception {
>             Integer row = value.f0.f0;
>             Integer column = value.f0.f1;
>             Double result = value.f0.f2 - value.f1.f2;
>             return new Tuple3<Integer, Integer, Double>(row, column, result);
>         }
>     }
>
>     public static final class getGapCenter implements
>             MapFunction<Tuple2<Tuple3<Integer, Integer, Double>,
>                     Tuple3<Integer, Integer, Double>>,
>                     Tuple3<Integer, Integer, Double>> {
>         @Override
>         public Tuple3<Integer, Integer, Double> map(
>                 Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
>                 throws Exception {
>             Integer row = value.f0.f0;
>             Integer column = value.f0.f1;
>             Double result = value.f0.f2 + (2/(Math.abs(value.f1.f2)));
>             return new Tuple3<Integer, Integer, Double>(row, column, result);
>         }
>     }
>
>
>     public static DataSet<Tuple3<Integer, Integer, Double>> PowerIteration_getEigenVector(DataSet<Tuple3<Integer, Integer, Double>> matrixA) throws Exception {
>
>         //get initial vector - which equals matrixA * [1, ... , 1]
>         DataSet<Tuple3<Integer, Integer, Double>> initial0 = matrixA.groupBy(0).aggregate(Aggregations.SUM,2);
>
>         //normalize by maximum value
>         DataSet<Tuple3<Integer, Integer, Double>> initial= initial0.cross(initial0.maxBy(2)).map(new normalizeByMax());
>
>         //BulkIteration to find dominant eigenvector
>         IterativeDataSet<Tuple3<Integer, Integer, Double>> iteration = initial.iterate(iterations);
>
>         DataSet<Tuple3<Integer, Integer, Double>> intermediate = (matrixA.join(iteration).where(1).equalTo(0)
>                 .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2)).groupBy(0).aggregate(Aggregations.SUM, 2).
>                 cross((matrixA.join(iteration).where(1).equalTo(0)
>                         .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2)).groupBy(0).aggregate(Aggregations.SUM, 2).maxBy(2))
>                 .map(new normalizeByMax());
>
>         DataSet<Tuple3<Integer, Integer, Double>> diffs = iteration.join(intermediate).where(0).equalTo(0).with(new deltaFilter());
>         DataSet<Tuple3<Integer, Integer, Double>> eigenVector  = iteration.closeWith(intermediate,diffs);
>
>         return eigenVector;
>     }
>
>     public static DataSet<Tuple3<Integer, Integer, Double>> PowerIteration_getEigenValue(DataSet<Tuple3<Integer, Integer, Double>> matrixA, DataSet<Tuple3<Integer, Integer, Double>> eigenVector) {
>
>         //determine now EigenValue by approximating the Rayleigh Quotient:
>         //get Ax
>         DataSet<Tuple3<Integer, Integer, Double>> Ax = matrixA.join(eigenVector).where(1).equalTo(0)
>                 .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2).groupBy(0).aggregate(Aggregations.SUM, 2);
>         //get Ax * x
>         DataSet<Tuple3<Integer, Integer, Double>> Axx = eigenVector.join(Ax).where(0).equalTo(0)
>                 .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2).aggregate(Aggregations.SUM,2);
>
>         //now x * x
>         DataSet<Tuple3<Integer, Integer, Double>> xx = eigenVector.join(eigenVector).where(0).equalTo(0)
>                 .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2).aggregate(Aggregations.SUM,2);
>
>         return Axx.cross(xx).map(new RQ()).aggregate(Aggregations.SUM, 2);
>     }
>
>     public static DataSet<Tuple3<Integer, Integer, Double>> PowerIteration_getNextMatrix(DataSet<Tuple3<Integer, Integer, Double>> matrixA, DataSet<Tuple3<Integer, Integer, Double>> eigenVector, DataSet<Tuple3<Integer, Integer, Double>> eigenValue) {
>
>         DataSet<Tuple3<Integer, Integer, Double>> eigenValueReset = eigenValue.map(new resetIndex());
>         DataSet<Tuple3<Integer, Integer, Double>> firstVal = eigenVector.filter(new FilterFunction<Tuple3<Integer, Integer, Double>>() {
>             public boolean filter(Tuple3<Integer, Integer, Double> value) {
>                 return value.f0 == 0;
>             }
>         });
>         firstVal = eigenValueReset.cross(firstVal).map(new firstX());
>         DataSet<Tuple3<Integer, Integer, Double>> firstRow = matrixA.filter(new FilterFunction<Tuple3<Integer, Integer, Double>>() {
>             public boolean filter(Tuple3<Integer, Integer, Double> value) {
>                 return value.f0 == 0;
>             }
>         });
>         DataSet<Tuple3<Integer, Integer, Double>> x = ((firstRow.map(new DataSetUtils.transpose())).join(firstVal).where(1).equalTo(0).map(new MatrixTimesValue())).map(new DataSetUtils.transpose());
>         DataSet<Tuple3<Integer, Integer, Double>> C = eigenVector.cross(eigenValueReset).map(new MatrixTimesValue()).map(new resetIndex2()).join(x).where(1).equalTo(0).
>                 map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2);
>         matrixA = matrixA.join(C).where(0,1).equalTo(0,1).map(new MatrixMinusMatrix());
>         matrixA = matrixA.filter(new FilterFunction<Tuple3<Integer, Integer, Double>>() {
>             public boolean filter(Tuple3<Integer, Integer, Double> value) {
>                 return (value.f0 != 0) && (value.f1 != 0);
>             }
>         });
>
>         return matrixA.map(new decBy1());
>     }
>
>     public MyResult PowerIteration_routine(MyResult initial) throws Exception {
>
>         //Approximate EigenVector by PowerIteration
>         DataSet<Tuple3<Integer, Integer, Double>> eigenVector = PowerIteration_getEigenVector(initial.matrixA);
>         //Approximate EigenValue by PowerIteration
>         DataSet<Tuple3<Integer, Integer, Double>> eigenValue = PowerIteration_getEigenValue(initial.matrixA, eigenVector);
>         //get gap
>         DataSet<Tuple3<Integer, Integer, Double>> gap = initial.eigenValue.cross(eigenValue).map(new MatrixMinusMatrix());
>         //Deflate original matrix
>         DataSet<Tuple3<Integer, Integer, Double>> matrixA = PowerIteration_getNextMatrix(initial.matrixA,eigenVector,eigenValue);
>
>         return new MyResult(eigenVector,eigenValue,matrixA,gap);
>     }
>
>     public class MyResult {
>         DataSet<Tuple3<Integer, Integer, Double>> eigenVector;
>         DataSet<Tuple3<Integer, Integer, Double>> eigenValue;
>         DataSet<Tuple3<Integer, Integer, Double>> gap;
>         DataSet<Tuple3<Integer, Integer, Double>> matrixA;
>
>         public MyResult(DataSet<Tuple3<Integer, Integer, Double>> eigenVector, DataSet<Tuple3<Integer, Integer, Double>> eigenValue,DataSet<Tuple3<Integer, Integer, Double>> matrixA){
>             this.eigenVector = eigenVector;
>             this.eigenValue =eigenValue;
>             this.matrixA = matrixA;
>         }
>
>         public MyResult(DataSet<Tuple3<Integer, Integer, Double>> eigenVector, DataSet<Tuple3<Integer, Integer, Double>> eigenValue,DataSet<Tuple3<Integer, Integer, Double>> matrixA, DataSet<Tuple3<Integer, Integer, Double>> gap){
>             this.eigenVector = eigenVector;
>             this.eigenValue =eigenValue;
>             this.matrixA = matrixA;
>             this.gap = gap;
>         }
>     }
>
> }
>