Fwd: Problem applying a groupReduce function to a grouped data set

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Fwd: Problem applying a groupReduce function to a grouped data set

Lea Helmers
Hi!

When I try to apply a groupReduce function to a data set I get an error.

The data set is created like this:

DataSet<Tuple3<String, String, String>> actorsTemp =
                env.readCsvFile("/home/lea/Documents/impro3_ws15/actors.tsv")
                        .fieldDelimiter("\t")
                        .includeFields("1110")
                        .types(String.class, String.class, String.class);

        DataSet<Tuple3<String, String, String>> actresses =
                env.readCsvFile("/home/lea/Documents/impro3_ws15/actresses.tsv")
                        .fieldDelimiter("\t")
                        .includeFields("1110")
                        .types(String.class, String.class, String.class);

        DataSet<Tuple3<Float, Float, String>> ratings =
                env.readCsvFile("/home/lea/Documents/impro3_ws15/ratings.tsv")
                        .fieldDelimiter("\t")
                        .includeFields("0111")
                        .types(Float.class, Float.class, String.class)
                        .filter(new NumberVotesFilter());

        //merge actors and actresses
        DataSet<Tuple3<String, String, String>> actors = actorsTemp.union(actresses);
        //create weighted rating
        DataSet<Tuple2<String, Float>> weightedRatings =
                    ratings.map(new WeightedRatingCalculator());

THIS IS WHAT I'M TRYING IN THE MAIN METHOD:

                    actors.map(new JoinNames())
                          .join(weightedRatings)
                          .where(1).equalTo(0)
                          .projectFirst(0).projectSecond(1)
                          .groupBy(0)
                          .reduceGroup(new MeanRatingCalculator())
                          .first(10).print();


And here is the GroupReduce function I wrote:

public static class MeanRatingCalculator implements GroupReduceFunction<Tuple2<String, Float>, Tuple3<String, Float, Integer>> {

        public void reduce(Iterable<Tuple2<String, Float>> ratedActors, Collector<Tuple3<String, Float, Integer>> out) throws Exception {

            String name = null;
            Float ratings = 0F;
            int numberOfMovies = 0;
            for (Tuple2<String, Float> a : ratedActors) {
                //store the name
                name = a.f0;
                //update the sum of the ratings and number of movies
                ratings += a.f1;
                numberOfMovies++;
            }
            // emit name, average rating and number of films
            out.collect(new Tuple3<String, Float, Integer>(name, ratings/(float)numberOfMovies, numberOfMovies));
        }
    }


I get the following error message when I try to compile the code:

java: method reduceGroup in class org.apache.flink.api.java.operators.UnsortedGrouping<T> cannot be applied to given types;
  required: org.apache.flink.api.common.functions.GroupReduceFunction<org.apache.flink.api.java.tuple.Tuple,R>
  found: de.tub.dima.TopActors.MeanRatingCalculator
  reason: no instance(s) of type variable(s) R exist so that argument type de.tub.dima.TopActors.MeanRatingCalculator conforms to formal parameter type org.apache.flink.api.common.functions.GroupReduceFunction<org.apache.flink.api.java.tuple.Tuple,R>


I can't figure out what the problem might be and would be very grateful for any help!! I hope I have given all the necessary information. I'm using Ubuntu 14.04 and IntelliJ Idea as IDE.

Thank you very much,

Lea
Reply | Threaded
Open this post in threaded view
|

Re: Fwd: Problem applying a groupReduce function to a grouped data set

Martin Junghanns-2
Hi,

just an idea: In the source code documentation, it states that
projectFirst and projectSecond lose type information, which could
explain why your group reduce expects <Tuple, R>.

I found an example [1] that calls .types() to define the returned types,
but this method is deprecated. What I would try is to replace the
projectFirst and projectSecond with a JoinFunction and output
Tuple2<String, Float> "manually" like so:

actors.map(new JoinNames())
  .join(weightedRatings)
  .where(1).equalTo(0)
  .with(new JoinFunction<
        TypeLeft,  // output type of JoinNames()
        Tuple2<String, Float>,
        Tuple2<String, Float>> {
       
        @Override
         public Tuple2<String, Float> join(
           TypeLeft left,
           Tuple2<String, Float> right) throws Exception {
           return new Tuple2<>(left.f0, right.f1);
         }
   })
   .withForwardedFieldsFirst("f0")
   .withForwardedFieldsSecond("f1")
   .groupBy(0)
   .reduceGroup(new MeanRatingCalculator())
   .first(10)
   .print();

Hope this helps.

Best,
Martin

[1]
https://ci.apache.org/projects/flink/flink-docs-master/apis/examples.html#relational-query

On 01.11.2015 13:52, Lea Helmers wrote:

> Hi!
>
> When I try to apply a groupReduce function to a data set I get an error.
>
> The data set is created like this:
>
> DataSet<Tuple3<String, String, String>> actorsTemp =
>
> env.readCsvFile("/home/lea/Documents/impro3_ws15/actors.tsv")
>                          .fieldDelimiter("\t")
>                          .includeFields("1110")
>                          .types(String.class, String.class, String.class);
>
>          DataSet<Tuple3<String, String, String>> actresses =
>
> env.readCsvFile("/home/lea/Documents/impro3_ws15/actresses.tsv")
>                          .fieldDelimiter("\t")
>                          .includeFields("1110")
>                          .types(String.class, String.class, String.class);
>
>          DataSet<Tuple3<Float, Float, String>> ratings =
>
> env.readCsvFile("/home/lea/Documents/impro3_ws15/ratings.tsv")
>                          .fieldDelimiter("\t")
>                          .includeFields("0111")
>                          .types(Float.class, Float.class, String.class)
>                          .filter(new NumberVotesFilter());
>
>          //merge actors and actresses
>          DataSet<Tuple3<String, String, String>> actors =
> actorsTemp.union(actresses);
>          //create weighted rating
>          DataSet<Tuple2<String, Float>> weightedRatings =
>                      ratings.map(new WeightedRatingCalculator());
>
> THIS IS WHAT I'M TRYING IN THE MAIN METHOD:
>
>                      actors.map(new JoinNames())
>                            .join(weightedRatings)
>                            .where(1).equalTo(0)
>                            .projectFirst(0).projectSecond(1)
>                            .groupBy(0)
>                            .reduceGroup(new MeanRatingCalculator())
>                            .first(10).print();
>
>
> And here is the GroupReduce function I wrote:
>
> public static class MeanRatingCalculator implements
> GroupReduceFunction<Tuple2<String, Float>, Tuple3<String, Float, Integer>> {
>
>          public void reduce(Iterable<Tuple2<String, Float>> ratedActors,
> Collector<Tuple3<String, Float, Integer>> out) throws Exception {
>
>              String name = null;
>              Float ratings = 0F;
>              int numberOfMovies = 0;
>              for (Tuple2<String, Float> a : ratedActors) {
>                  //store the name
>                  name = a.f0;
>                  //update the sum of the ratings and number of movies
>                  ratings += a.f1;
>                  numberOfMovies++;
>              }
>              // emit name, average rating and number of films
>              out.collect(new Tuple3<String, Float, Integer>(name,
> ratings/(float)numberOfMovies, numberOfMovies));
>          }
>      }
>
>
> I get the following error message when I try to compile the code:
>
> java: method reduceGroup in class
> org.apache.flink.api.java.operators.UnsortedGrouping<T> cannot be
> applied to given types;
>    required:
> org.apache.flink.api.common.functions.GroupReduceFunction<org.apache.flink.api.java.tuple.Tuple,R>
>    found: de.tub.dima.TopActors.MeanRatingCalculator
> reason: no instance(s) of type variable(s) R exist so that argument type
> de.tub.dima.TopActors.MeanRatingCalculator conforms to formal parameter
> type
> org.apache.flink.api.common.functions.GroupReduceFunction<org.apache.flink.api.java.tuple.Tuple,R>
>
>
> I can't figure out what the problem might be and would be very grateful
> for any help!! I hope I have given all the necessary information. I'm
> using Ubuntu 14.04 and IntelliJ Idea as IDE.
>
> Thank you very much,
>
> Lea