Hi Mailing List,
I probably have a problem with the Lazy Evaluation. Depending of the “return” Datatype of my last Transformation (GroupReduce), the integrated Flink Mini Clusters does not start. I have done the following: // Configuration Configuration parameters = new Configuration(); parameters.setString("path", "generated_2000000_tuples_10_dimensions_100.0_mean_25.0_std_and_498762467_seed.csv"); parameters.setString("output", "result_MR_GPSRS.csv"); parameters.setInteger("dimensionality", 10); parameters.setInteger("cardinality", 2000000); parameters.setDouble("min", 0.0); parameters.setDouble("max", 200.0); // Setting Up Execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // Reading CSV DataSet<?> input = InputConfig.setDataSetFromCSV(parameters.getInteger("dimensionality", 2), env, parameters.getString("path", "")); //Broadcast BitString, Cardinality, Dimensionality, PPD @SuppressWarnings({ "unchecked", "rawtypes" }) DataSet<Tuple4<BitSet,Integer,Integer,Integer>> metaData = input .mapPartition(new MR_GP_BitStringGeneratorMapper()) .reduceGroup(new MR_GP_BitStringGeneratorReducer()); // Calculate result @SuppressWarnings({ "unchecked", "rawtypes" }) DataSet<?> result = input .mapPartition(new MR_GPSRS_Mapper()).withBroadcastSet(metaData, "MetaData").withParameters(parameters) .reduceGroup(new MR_GPSRS_Reducer()).withBroadcastSet(metaData, "MetaData").withParameters(parameters); try { result.writeAsCsv(parameters.getString("output", ""), FileSystem.WriteMode.OVERWRITE); JobExecutionResult job = env.execute(); System.out.println("Runtime in seconds: "+(job.getNetRuntime()/1000)); } catch (Exception e) { // TODO Auto-generated catch block } When I run my program with the integrated Flink mini cluster in eclipse, the console outputs only the following: 19:00:12,978 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class java.util.BitSet is not a valid POJO type 19:00:13,010 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class java.util.BitSet is not a valid POJO type 19:00:13,017 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class java.util.ArrayList is not a valid POJO type 19:00:13,021 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class java.util.ArrayList is not a valid POJO type The MR_GPSRS_Reducer looks like the following: public class MR_GPSRS_Reducer <T extends Tuple> extends RichGroupReduceFunction<ArrayList<ArrayList<T>>, T> [...] @Override public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition, Collector<T> out) throws Exception { [...] for (T tuple : tuples) { out.collect(tuple); } If i change my code of MR_GPSRS_Reducer to fit the following, it has still the same behavior: public class MR_GPSRS_Reducer <T extends Tuple> extends RichGroupReduceFunction<ArrayList<ArrayList<T>>, Tuple1<T>>{ [...] @Override public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition, Collector<Tuple1<T>> out) throws Exception { [...] for (T tuple : tuples) { out.collect(new Tuple1<T>(tuple)); } Same as here: public class MR_GPSRS_Reducer <T extends Tuple> extends RichGroupReduceFunction<ArrayList<ArrayList<T>>, ArrayList<T>>{ [...] public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition, Collector<ArrayList<T>> out) throws Exception { [...] out.collect(tuples); Only if i change the MR_GPSRS_Reducer to the following, the Flink Mini Cluster starts: public class MR_GPSRS_Reducer <T extends Tuple> extends RichGroupReduceFunction<ArrayList<ArrayList<T>>, Tuple1<ArrayList<T>>>{ [...] public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition, Collector<Tuple1<ArrayList<T>>> out) throws Exception { [...] out.collect(new Tuple1<ArrayList<T>>(tuples)); (The hint to the not valid PoJo types still remains) But that isn't my preferred format for the DataSink... If I uncomment the MR_GPSRS_Reducer (the last transformation), the Flink Minicluster also starts. Has anybody an idea, how can I teach Flink to execute my program with T's as DataSink? (In that case, T would be a Tuple10 with Doubles). (I have already tried to explicitly typecast the datasets and transformations, so that the suppression of the warnings isn't necessary any more) I'm using <flink.version>1.0.1</flink.version> Thank you in advance Robert |
Hi Mailing List,
after "upgrading" the flink version in my pom.xml to 1.0.3, i get two error messages for these output variants, which don't work: org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(MR_GPSRS.java:69)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface. Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'OUT' in 'class org.apache.flink.api.common.functions.RichGroupReduceFunction' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). After adding adding ".returns(input.getType())" to my transformation, everything works great now : - ) Many thanks to these developers, who added this messages in the last versions! Best, Robert ________________________________________ Von: Paschek, Robert <[hidden email]> Gesendet: Dienstag, 14. Juni 2016 19:52 An: [hidden email] Betreff: Lazy Evaluation Hi Mailing List, I probably have a problem with the Lazy Evaluation. Depending of the “return” Datatype of my last Transformation (GroupReduce), the integrated Flink Mini Clusters does not start. I have done the following: // Configuration Configuration parameters = new Configuration(); parameters.setString("path", "generated_2000000_tuples_10_dimensions_100.0_mean_25.0_std_and_498762467_seed.csv"); parameters.setString("output", "result_MR_GPSRS.csv"); parameters.setInteger("dimensionality", 10); parameters.setInteger("cardinality", 2000000); parameters.setDouble("min", 0.0); parameters.setDouble("max", 200.0); // Setting Up Execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // Reading CSV DataSet<?> input = InputConfig.setDataSetFromCSV(parameters.getInteger("dimensionality", 2), env, parameters.getString("path", "")); //Broadcast BitString, Cardinality, Dimensionality, PPD @SuppressWarnings({ "unchecked", "rawtypes" }) DataSet<Tuple4<BitSet,Integer,Integer,Integer>> metaData = input .mapPartition(new MR_GP_BitStringGeneratorMapper()) .reduceGroup(new MR_GP_BitStringGeneratorReducer()); // Calculate result @SuppressWarnings({ "unchecked", "rawtypes" }) DataSet<?> result = input .mapPartition(new MR_GPSRS_Mapper()).withBroadcastSet(metaData, "MetaData").withParameters(parameters) .reduceGroup(new MR_GPSRS_Reducer()).withBroadcastSet(metaData, "MetaData").withParameters(parameters); try { result.writeAsCsv(parameters.getString("output", ""), FileSystem.WriteMode.OVERWRITE); JobExecutionResult job = env.execute(); System.out.println("Runtime in seconds: "+(job.getNetRuntime()/1000)); } catch (Exception e) { // TODO Auto-generated catch block } When I run my program with the integrated Flink mini cluster in eclipse, the console outputs only the following: 19:00:12,978 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class java.util.BitSet is not a valid POJO type 19:00:13,010 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class java.util.BitSet is not a valid POJO type 19:00:13,017 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class java.util.ArrayList is not a valid POJO type 19:00:13,021 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class java.util.ArrayList is not a valid POJO type The MR_GPSRS_Reducer looks like the following: public class MR_GPSRS_Reducer <T extends Tuple> extends RichGroupReduceFunction<ArrayList<ArrayList<T>>, T> [...] @Override public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition, Collector<T> out) throws Exception { [...] for (T tuple : tuples) { out.collect(tuple); } If i change my code of MR_GPSRS_Reducer to fit the following, it has still the same behavior: public class MR_GPSRS_Reducer <T extends Tuple> extends RichGroupReduceFunction<ArrayList<ArrayList<T>>, Tuple1<T>>{ [...] @Override public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition, Collector<Tuple1<T>> out) throws Exception { [...] for (T tuple : tuples) { out.collect(new Tuple1<T>(tuple)); } Same as here: public class MR_GPSRS_Reducer <T extends Tuple> extends RichGroupReduceFunction<ArrayList<ArrayList<T>>, ArrayList<T>>{ [...] public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition, Collector<ArrayList<T>> out) throws Exception { [...] out.collect(tuples); Only if i change the MR_GPSRS_Reducer to the following, the Flink Mini Cluster starts: public class MR_GPSRS_Reducer <T extends Tuple> extends RichGroupReduceFunction<ArrayList<ArrayList<T>>, Tuple1<ArrayList<T>>>{ [...] public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition, Collector<Tuple1<ArrayList<T>>> out) throws Exception { [...] out.collect(new Tuple1<ArrayList<T>>(tuples)); (The hint to the not valid PoJo types still remains) But that isn't my preferred format for the DataSink... If I uncomment the MR_GPSRS_Reducer (the last transformation), the Flink Minicluster also starts. Has anybody an idea, how can I teach Flink to execute my program with T's as DataSink? (In that case, T would be a Tuple10 with Doubles). (I have already tried to explicitly typecast the datasets and transformations, so that the suppression of the warnings isn't necessary any more) I'm using <flink.version>1.0.1</flink.version> Thank you in advance Robert |
Great to hear that it works now! :-) On Sun, 19 Jun 2016 at 16:33 Paschek, Robert <[hidden email]> wrote: Hi Mailing List, |
Free forum by Nabble | Edit this page |