Hi,
I want do a simple MatrixMultiplication and use the following code (see bottom).
For matrices 50x50 or 100x100 it is no problem. But already with matrices of 1000x1000 it would not work anymore and gets stuck in the joining part.
What am I doing wrong?
Best regards,
Lydia
package de.tuberlin.dima.aim3.assignment3;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.CsvReader;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.DataSet;
public class MatrixMultiplication {
static String input = null;
static String output = null;
public void run() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, Integer, Double>> matrixA = readMatrix(env, input);
matrixA.join(matrixA).where(1).equalTo(0)
.map(new ProjectJoinResultMapper()).groupBy(0, 1).sum(2).writeAsCsv(output);
env.execute();
}
public static DataSource<Tuple3<Integer, Integer, Double>> readMatrix(ExecutionEnvironment env,
String filePath) {
CsvReader csvReader = env.readCsvFile(filePath);
csvReader.fieldDelimiter(',');
csvReader.includeFields("fttt");
return csvReader.types(Integer.class, Integer.class, Double.class);
}
public static final class ProjectJoinResultMapper implements
MapFunction<Tuple2<Tuple3<Integer, Integer, Double>,
Tuple3<Integer, Integer, Double>>,
Tuple3<Integer, Integer, Double>> {
@Override
public Tuple3<Integer, Integer, Double> map(
Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
throws Exception {
Integer row = value.f0.f0;
Integer column = value.f1.f1;
Double product = value.f0.f2 * value.f1.f2;
return new Tuple3<Integer, Integer, Double>(row, column, product);
}
}
public static void main(String[] args) throws Exception {
if(args.length<2){
System.err.println("Usage: MatrixMultiplication <input path> <result path>");
System.exit(0);
}
input = args[0];
output = args[1];
new MatrixMultiplication().run();
}
}