package org.apache.flink.allIn; import java.util.ArrayList; import java.util.Iterator; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.aggregation.Aggregations; import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.tuple.Tuple7; import org.apache.flink.util.Collector; import org.apache.flink.core.fs.FileSystem.WriteMode; @SuppressWarnings("serial") public class StratosphereMultiFlink6Job{ public static void main(String[] args) throws Exception { // set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet> InputRecs = env.readTextFile(args[0]) .map(new SourceMapper()); DataSet> Uni = InputRecs.map(new UniMapper()) .groupBy(0) .aggregate(Aggregations.SUM, 1) .joinWithHuge(InputRecs).where(0).equalTo(0) .projectSecond(0,1,2).projectFirst(1) .types(Integer.class, Integer.class, Integer.class, Integer.class) ; DataSet> sim = Uni // .joinWithHuge(Uni).where(0).equalTo(0).with(new JoinCondition()) // .groupBy(0,1) // .reduceGroup(new ReduceSim2()); // .groupBy(1) .reduceGroup(new ReduceSim1()) .groupBy(0,1) .reduceGroup(new ReduceSim2()); //////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////// sim.writeAsText(args[1], WriteMode.OVERWRITE); // emit result //sim.print(); // execute program env.execute("WordCount Example"); } // public class JoinCondition // extends JoinFunction, Tuple4 // , Tuple7> { // // @Override // public Tuple7 join(Tuple4 tup1, Tuple4 tup2) { // // multiply the points and rating and construct a new output tuple // if(tup1.f0.intValue()( // tup1.f0.intValue(),tup2.f0.intValue(),tup1.f3.intValue(),tup2.f3.intValue(),String.valueOf(tup1.f1),tup1.f2.intValue(),tup2.f2.intValue());//i j//uni i//uni j// ak// fik// fjk // }else return null; // } // } // // User Functions // /** * Implements the string tokenizer that splits sentences into words as a user-defined * FlatMapFunction. The function takes a line (String) and splits it into * multiple pairs in the form of "(word,1)" (Tuple2). */ public static class SourceMapper implements MapFunction> { @Override public Tuple3 map (String in) { String[] tuple = in.split(" "); return (new Tuple3(Integer.parseInt(tuple[0]),Integer.parseInt(tuple[1]),Integer.parseInt(tuple[2]))); } } @ConstantFields("0->0") public static class UniMapper implements MapFunction,Tuple2> { @Override public Tuple2 map (Tuple3 in) { return (new Tuple2(in.f0.intValue(),in.f2.intValue())); } } // @ConstantFields("1->4") public static class ReduceSim1 implements GroupReduceFunction,Tuple7> { @Override public void reduce(Iterable> in, Collector> out){ ArrayList> tempRecList = new ArrayList>(); for (Tuple4 tup1:in) {// TODO warum sind hier mehrmals die Identischen Tupel drin? //if(tempRecList.contains(rec1)){continue;} //print tempRecList //p(rec1); //p(tempRecList.toString()); for(Tuple4 tup2: tempRecList){// input recs sind: i, ak, fik, uni if(tup1.f0( tup1.f0.intValue(),tup2.f0.intValue(),tup1.f3.intValue(),tup2.f3.intValue(),tup1.f1.intValue(),tup1.f2.intValue(),tup2.f2.intValue()));//i j//uni i//uni j// ak// fik// fjk }else{ out.collect(new Tuple7( tup2.f0.intValue(),tup1.f0.intValue(),tup2.f3.intValue(),tup1.f3.intValue(),tup2.f1.intValue(),tup2.f2.intValue(),tup1.f2.intValue()));//i j//uni i//uni j// ak// fik// fjk } } tempRecList.add(new Tuple4( tup1.f0.intValue(), tup1.f1.intValue(), tup1.f2.intValue(), tup1.f3.intValue())); } tempRecList.clear(); //p("ReduceErgebnis: " + current.getField(0, IntValue.class)+" "+ count); } } public static class ReduceSim2 implements GroupReduceFunction,Tuple3> { @Override public void reduce(Iterable> in, Collector> out) { double sim = 0; int unii = 0; int unij = 0; Iterator> it = in.iterator(); Tuple7 current = null; //unilateral partial results (|*|) for Ruzicka sim double sum = 0; while (it.hasNext()) { current = it.next(); sum += Math.min(current.f5, current.f6); } unii = current.f2; unij = current.f3; sim = sum/(unii+unij-sum); //p("sim: "+sim+", unii: "+unii+", unij: "+unij+", sum: "+sum); out.collect(new Tuple3( current.f0.intValue(), current.f1.intValue(), sim)); // p("SimErgebnis: " + current.f0+" " // + "" + current.f1+" " // + "" + Double.toString(sim)); } } // public static void p(Object o){ // System.out.println(o); // }; }