MatrixMultiplication

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

MatrixMultiplication

Lydia Ickler
Hi, 

I want do a simple MatrixMultiplication and use the following code (see bottom).
For matrices 50x50 or 100x100 it is no problem. But already with matrices of 1000x1000 it would not work anymore and gets stuck in the joining part. 
What am I doing wrong?

Best regards, 
Lydia

package de.tuberlin.dima.aim3.assignment3;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.CsvReader;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.DataSet;


public class MatrixMultiplication {

static String input = null;
static String output = null;

public void run() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Tuple3<Integer, Integer, Double>> matrixA = readMatrix(env, input);

matrixA.join(matrixA).where(1).equalTo(0)
.map(new ProjectJoinResultMapper()).groupBy(0, 1).sum(2).writeAsCsv(output);


env.execute();
}



public static DataSource<Tuple3<Integer, Integer, Double>> readMatrix(ExecutionEnvironment env,
String filePath) {
CsvReader csvReader = env.readCsvFile(filePath);
csvReader.fieldDelimiter(',');
csvReader.includeFields("fttt");
return csvReader.types(Integer.class, Integer.class, Double.class);
}

public static final class ProjectJoinResultMapper implements
MapFunction<Tuple2<Tuple3<Integer, Integer, Double>,
Tuple3<Integer, Integer, Double>>,
Tuple3<Integer, Integer, Double>> {
@Override
public Tuple3<Integer, Integer, Double> map(
Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
throws Exception {
Integer row = value.f0.f0;
Integer column = value.f1.f1;
Double product = value.f0.f2 * value.f1.f2;
return new Tuple3<Integer, Integer, Double>(row, column, product);
}
}


public static void main(String[] args) throws Exception {
if(args.length<2){
System.err.println("Usage: MatrixMultiplication <input path> <result path>");
System.exit(0);
}
input = args[0];
output = args[1];
new MatrixMultiplication().run();
}

}

Reply | Threaded
Open this post in threaded view
|

Re: MatrixMultiplication

Till Rohrmann
Hi Lydia,

Since matrix multiplication is O(n^3), I would assume that it would simply take 1000 times longer than the multiplication of the 100 x 100 matrix. Have you waited so long to see whether it completes or is there another problem?

Cheers,
Till

On Mon, Jan 25, 2016 at 2:13 PM, Lydia Ickler <[hidden email]> wrote:
Hi, 

I want do a simple MatrixMultiplication and use the following code (see bottom).
For matrices 50x50 or 100x100 it is no problem. But already with matrices of 1000x1000 it would not work anymore and gets stuck in the joining part. 
What am I doing wrong?

Best regards, 
Lydia

package de.tuberlin.dima.aim3.assignment3;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.CsvReader;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.DataSet;


public class MatrixMultiplication {

static String input = null;
static String output = null;

public void run() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Tuple3<Integer, Integer, Double>> matrixA = readMatrix(env, input);

matrixA.join(matrixA).where(1).equalTo(0)
.map(new ProjectJoinResultMapper()).groupBy(0, 1).sum(2).writeAsCsv(output);


env.execute();
}



public static DataSource<Tuple3<Integer, Integer, Double>> readMatrix(ExecutionEnvironment env,
String filePath) {
CsvReader csvReader = env.readCsvFile(filePath);
csvReader.fieldDelimiter(',');
csvReader.includeFields("fttt");
return csvReader.types(Integer.class, Integer.class, Double.class);
}

public static final class ProjectJoinResultMapper implements
MapFunction<Tuple2<Tuple3<Integer, Integer, Double>,
Tuple3<Integer, Integer, Double>>,
Tuple3<Integer, Integer, Double>> {
@Override
public Tuple3<Integer, Integer, Double> map(
Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
throws Exception {
Integer row = value.f0.f0;
Integer column = value.f1.f1;
Double product = value.f0.f2 * value.f1.f2;
return new Tuple3<Integer, Integer, Double>(row, column, product);
}
}


public static void main(String[] args) throws Exception {
if(args.length<2){
System.err.println("Usage: MatrixMultiplication <input path> <result path>");
System.exit(0);
}
input = args[0];
output = args[1];
new MatrixMultiplication().run();
}

}


Reply | Threaded
Open this post in threaded view
|

Re: MatrixMultiplication

Lydia Ickler
Hi Till,

thanks for your reply :)
Yes, it finished after ~27 minutes…

Best regards, 
Lydia

Am 25.01.2016 um 14:27 schrieb Till Rohrmann <[hidden email]>:

Hi Lydia,

Since matrix multiplication is O(n^3), I would assume that it would simply take 1000 times longer than the multiplication of the 100 x 100 matrix. Have you waited so long to see whether it completes or is there another problem?

Cheers,
Till

On Mon, Jan 25, 2016 at 2:13 PM, Lydia Ickler <[hidden email]> wrote:
Hi, 

I want do a simple MatrixMultiplication and use the following code (see bottom).
For matrices 50x50 or 100x100 it is no problem. But already with matrices of 1000x1000 it would not work anymore and gets stuck in the joining part. 
What am I doing wrong?

Best regards, 
Lydia

package de.tuberlin.dima.aim3.assignment3;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.CsvReader;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.DataSet;


public class MatrixMultiplication {

static String input = null;
static String output = null;

public void run() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Tuple3<Integer, Integer, Double>> matrixA = readMatrix(env, input);

matrixA.join(matrixA).where(1).equalTo(0)
.map(new ProjectJoinResultMapper()).groupBy(0, 1).sum(2).writeAsCsv(output);


env.execute();
}



public static DataSource<Tuple3<Integer, Integer, Double>> readMatrix(ExecutionEnvironment env,
String filePath) {
CsvReader csvReader = env.readCsvFile(filePath);
csvReader.fieldDelimiter(',');
csvReader.includeFields("fttt");
return csvReader.types(Integer.class, Integer.class, Double.class);
}

public static final class ProjectJoinResultMapper implements
MapFunction<Tuple2<Tuple3<Integer, Integer, Double>,
Tuple3<Integer, Integer, Double>>,
Tuple3<Integer, Integer, Double>> {
@Override
public Tuple3<Integer, Integer, Double> map(
Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
throws Exception {
Integer row = value.f0.f0;
Integer column = value.f1.f1;
Double product = value.f0.f2 * value.f1.f2;
return new Tuple3<Integer, Integer, Double>(row, column, product);
}
}


public static void main(String[] args) throws Exception {
if(args.length<2){
System.err.println("Usage: MatrixMultiplication <input path> <result path>");
System.exit(0);
}
input = args[0];
output = args[1];
new MatrixMultiplication().run();
}

}