Hi!
When I try to apply a groupReduce function to a data set I get an error. The data set is created like this: DataSet<Tuple3<String, String, String>> actorsTemp = THIS IS WHAT I'M TRYING IN THE MAIN METHOD: actors.map(new JoinNames()) And here is the GroupReduce function I wrote: public
static class MeanRatingCalculator implements
GroupReduceFunction<Tuple2<String, Float>, Tuple3<String,
Float, Integer>> { I get the following error message when I try to compile the code: java:
method reduceGroup in class
org.apache.flink.api.java.operators.UnsortedGrouping<T> cannot be
applied to given types; I can't figure out what the problem might be and would be very grateful for any help!! I hope I have given all the necessary information. I'm using Ubuntu 14.04 and IntelliJ Idea as IDE. Thank you very much, Lea |
Hi,
just an idea: In the source code documentation, it states that projectFirst and projectSecond lose type information, which could explain why your group reduce expects <Tuple, R>. I found an example [1] that calls .types() to define the returned types, but this method is deprecated. What I would try is to replace the projectFirst and projectSecond with a JoinFunction and output Tuple2<String, Float> "manually" like so: actors.map(new JoinNames()) .join(weightedRatings) .where(1).equalTo(0) .with(new JoinFunction< TypeLeft, // output type of JoinNames() Tuple2<String, Float>, Tuple2<String, Float>> { @Override public Tuple2<String, Float> join( TypeLeft left, Tuple2<String, Float> right) throws Exception { return new Tuple2<>(left.f0, right.f1); } }) .withForwardedFieldsFirst("f0") .withForwardedFieldsSecond("f1") .groupBy(0) .reduceGroup(new MeanRatingCalculator()) .first(10) .print(); Hope this helps. Best, Martin [1] https://ci.apache.org/projects/flink/flink-docs-master/apis/examples.html#relational-query On 01.11.2015 13:52, Lea Helmers wrote: > Hi! > > When I try to apply a groupReduce function to a data set I get an error. > > The data set is created like this: > > DataSet<Tuple3<String, String, String>> actorsTemp = > > env.readCsvFile("/home/lea/Documents/impro3_ws15/actors.tsv") > .fieldDelimiter("\t") > .includeFields("1110") > .types(String.class, String.class, String.class); > > DataSet<Tuple3<String, String, String>> actresses = > > env.readCsvFile("/home/lea/Documents/impro3_ws15/actresses.tsv") > .fieldDelimiter("\t") > .includeFields("1110") > .types(String.class, String.class, String.class); > > DataSet<Tuple3<Float, Float, String>> ratings = > > env.readCsvFile("/home/lea/Documents/impro3_ws15/ratings.tsv") > .fieldDelimiter("\t") > .includeFields("0111") > .types(Float.class, Float.class, String.class) > .filter(new NumberVotesFilter()); > > //merge actors and actresses > DataSet<Tuple3<String, String, String>> actors = > actorsTemp.union(actresses); > //create weighted rating > DataSet<Tuple2<String, Float>> weightedRatings = > ratings.map(new WeightedRatingCalculator()); > > THIS IS WHAT I'M TRYING IN THE MAIN METHOD: > > actors.map(new JoinNames()) > .join(weightedRatings) > .where(1).equalTo(0) > .projectFirst(0).projectSecond(1) > .groupBy(0) > .reduceGroup(new MeanRatingCalculator()) > .first(10).print(); > > > And here is the GroupReduce function I wrote: > > public static class MeanRatingCalculator implements > GroupReduceFunction<Tuple2<String, Float>, Tuple3<String, Float, Integer>> { > > public void reduce(Iterable<Tuple2<String, Float>> ratedActors, > Collector<Tuple3<String, Float, Integer>> out) throws Exception { > > String name = null; > Float ratings = 0F; > int numberOfMovies = 0; > for (Tuple2<String, Float> a : ratedActors) { > //store the name > name = a.f0; > //update the sum of the ratings and number of movies > ratings += a.f1; > numberOfMovies++; > } > // emit name, average rating and number of films > out.collect(new Tuple3<String, Float, Integer>(name, > ratings/(float)numberOfMovies, numberOfMovies)); > } > } > > > I get the following error message when I try to compile the code: > > java: method reduceGroup in class > org.apache.flink.api.java.operators.UnsortedGrouping<T> cannot be > applied to given types; > required: > org.apache.flink.api.common.functions.GroupReduceFunction<org.apache.flink.api.java.tuple.Tuple,R> > found: de.tub.dima.TopActors.MeanRatingCalculator > reason: no instance(s) of type variable(s) R exist so that argument type > de.tub.dima.TopActors.MeanRatingCalculator conforms to formal parameter > type > org.apache.flink.api.common.functions.GroupReduceFunction<org.apache.flink.api.java.tuple.Tuple,R> > > > I can't figure out what the problem might be and would be very grateful > for any help!! I hope I have given all the necessary information. I'm > using Ubuntu 14.04 and IntelliJ Idea as IDE. > > Thank you very much, > > Lea |
Free forum by Nabble | Edit this page |