Lazy Evaluation

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Lazy Evaluation

Paschek, Robert
Hi Mailing List,

I probably have a problem with the Lazy Evaluation. Depending of the “return” Datatype of my last Transformation (GroupReduce), the integrated Flink Mini Clusters does not start. I have done the following:

// Configuration
Configuration parameters = new Configuration();
parameters.setString("path", "generated_2000000_tuples_10_dimensions_100.0_mean_25.0_std_and_498762467_seed.csv");
parameters.setString("output", "result_MR_GPSRS.csv");
parameters.setInteger("dimensionality", 10);
parameters.setInteger("cardinality", 2000000);
parameters.setDouble("min", 0.0);
parameters.setDouble("max", 200.0);

// Setting Up Execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// Reading CSV
DataSet<?> input = InputConfig.setDataSetFromCSV(parameters.getInteger("dimensionality", 2), env, parameters.getString("path", ""));
               

//Broadcast BitString, Cardinality, Dimensionality, PPD
@SuppressWarnings({ "unchecked", "rawtypes" })
DataSet<Tuple4<BitSet,Integer,Integer,Integer>> metaData =
                                input
                                .mapPartition(new MR_GP_BitStringGeneratorMapper())
                                .reduceGroup(new MR_GP_BitStringGeneratorReducer());

// Calculate result
@SuppressWarnings({ "unchecked", "rawtypes" })
DataSet<?> result = input
                                .mapPartition(new MR_GPSRS_Mapper()).withBroadcastSet(metaData, "MetaData").withParameters(parameters)
                                .reduceGroup(new MR_GPSRS_Reducer()).withBroadcastSet(metaData, "MetaData").withParameters(parameters);
               

try {
        result.writeAsCsv(parameters.getString("output", ""), FileSystem.WriteMode.OVERWRITE);
        JobExecutionResult job = env.execute();
        System.out.println("Runtime in seconds: "+(job.getNetRuntime()/1000));
        } catch (Exception e) {
        // TODO Auto-generated catch block
}



When I run my program with the integrated Flink mini cluster in eclipse, the console outputs only the following:
19:00:12,978 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class java.util.BitSet is not a valid POJO type
19:00:13,010 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class java.util.BitSet is not a valid POJO type
19:00:13,017 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class java.util.ArrayList is not a valid POJO type
19:00:13,021 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class java.util.ArrayList is not a valid POJO type

The MR_GPSRS_Reducer looks like the following:
public class MR_GPSRS_Reducer <T extends Tuple> extends RichGroupReduceFunction<ArrayList<ArrayList<T>>, T>
[...]
@Override
public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition, Collector<T> out) throws Exception {
[...]
for (T tuple : tuples) {
    out.collect(tuple);
}

If i change my code of MR_GPSRS_Reducer to fit the following, it has still the same behavior:
public class MR_GPSRS_Reducer <T extends Tuple> extends RichGroupReduceFunction<ArrayList<ArrayList<T>>, Tuple1<T>>{
[...]
@Override
public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition, Collector<Tuple1<T>> out) throws Exception {
[...]
for (T tuple : tuples) {
    out.collect(new Tuple1<T>(tuple));
}

Same as here:
public class MR_GPSRS_Reducer <T extends Tuple> extends RichGroupReduceFunction<ArrayList<ArrayList<T>>, ArrayList<T>>{
[...]
public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition, Collector<ArrayList<T>> out) throws Exception {
[...]
out.collect(tuples);

Only if i change the MR_GPSRS_Reducer to the following, the Flink Mini Cluster starts:
public class MR_GPSRS_Reducer <T extends Tuple> extends RichGroupReduceFunction<ArrayList<ArrayList<T>>, Tuple1<ArrayList<T>>>{
[...]
public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition, Collector<Tuple1<ArrayList<T>>> out) throws Exception {
[...]
out.collect(new Tuple1<ArrayList<T>>(tuples));
(The hint to the not valid PoJo types still remains)

But that isn't my preferred format for the DataSink...
If I uncomment the MR_GPSRS_Reducer (the last transformation), the Flink Minicluster also starts.


Has anybody an idea, how can I teach Flink to execute my program with T's as DataSink? (In that case, T would be a Tuple10 with Doubles).
(I have already tried to explicitly typecast the datasets and transformations, so that the suppression of the warnings isn't necessary any more)

I'm using <flink.version>1.0.1</flink.version>

Thank you in advance
Robert


Reply | Threaded
Open this post in threaded view
|

AW: Lazy Evaluation

Paschek, Robert
Hi Mailing List,

after "upgrading" the flink version in my pom.xml to 1.0.3, i get two error messages for these output variants, which don't work:

org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(MR_GPSRS.java:69)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'OUT' in 'class org.apache.flink.api.common.functions.RichGroupReduceFunction' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s).


After adding adding ".returns(input.getType())" to my transformation, everything works great now : - )
Many thanks to these developers, who added this messages in the last versions!

Best,
Robert
________________________________________
Von: Paschek, Robert <[hidden email]>
Gesendet: Dienstag, 14. Juni 2016 19:52
An: [hidden email]
Betreff: Lazy Evaluation

Hi Mailing List,

I probably have a problem with the Lazy Evaluation. Depending of the “return” Datatype of my last Transformation (GroupReduce), the integrated Flink Mini Clusters does not start. I have done the following:

// Configuration
Configuration parameters = new Configuration();
parameters.setString("path", "generated_2000000_tuples_10_dimensions_100.0_mean_25.0_std_and_498762467_seed.csv");
parameters.setString("output", "result_MR_GPSRS.csv");
parameters.setInteger("dimensionality", 10);
parameters.setInteger("cardinality", 2000000);
parameters.setDouble("min", 0.0);
parameters.setDouble("max", 200.0);

// Setting Up Execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// Reading CSV
DataSet<?> input = InputConfig.setDataSetFromCSV(parameters.getInteger("dimensionality", 2), env, parameters.getString("path", ""));


//Broadcast BitString, Cardinality, Dimensionality, PPD
@SuppressWarnings({ "unchecked", "rawtypes" })
DataSet<Tuple4<BitSet,Integer,Integer,Integer>> metaData =
                                input
                                .mapPartition(new MR_GP_BitStringGeneratorMapper())
                                .reduceGroup(new MR_GP_BitStringGeneratorReducer());

// Calculate result
@SuppressWarnings({ "unchecked", "rawtypes" })
DataSet<?> result = input
                                .mapPartition(new MR_GPSRS_Mapper()).withBroadcastSet(metaData, "MetaData").withParameters(parameters)
                                .reduceGroup(new MR_GPSRS_Reducer()).withBroadcastSet(metaData, "MetaData").withParameters(parameters);


try {
        result.writeAsCsv(parameters.getString("output", ""), FileSystem.WriteMode.OVERWRITE);
        JobExecutionResult job = env.execute();
        System.out.println("Runtime in seconds: "+(job.getNetRuntime()/1000));
        } catch (Exception e) {
        // TODO Auto-generated catch block
}



When I run my program with the integrated Flink mini cluster in eclipse, the console outputs only the following:
19:00:12,978 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class java.util.BitSet is not a valid POJO type
19:00:13,010 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class java.util.BitSet is not a valid POJO type
19:00:13,017 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class java.util.ArrayList is not a valid POJO type
19:00:13,021 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class java.util.ArrayList is not a valid POJO type

The MR_GPSRS_Reducer looks like the following:
public class MR_GPSRS_Reducer <T extends Tuple> extends RichGroupReduceFunction<ArrayList<ArrayList<T>>, T>
[...]
@Override
public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition, Collector<T> out) throws Exception {
[...]
for (T tuple : tuples) {
    out.collect(tuple);
}

If i change my code of MR_GPSRS_Reducer to fit the following, it has still the same behavior:
public class MR_GPSRS_Reducer <T extends Tuple> extends RichGroupReduceFunction<ArrayList<ArrayList<T>>, Tuple1<T>>{
[...]
@Override
public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition, Collector<Tuple1<T>> out) throws Exception {
[...]
for (T tuple : tuples) {
    out.collect(new Tuple1<T>(tuple));
}

Same as here:
public class MR_GPSRS_Reducer <T extends Tuple> extends RichGroupReduceFunction<ArrayList<ArrayList<T>>, ArrayList<T>>{
[...]
public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition, Collector<ArrayList<T>> out) throws Exception {
[...]
out.collect(tuples);

Only if i change the MR_GPSRS_Reducer to the following, the Flink Mini Cluster starts:
public class MR_GPSRS_Reducer <T extends Tuple> extends RichGroupReduceFunction<ArrayList<ArrayList<T>>, Tuple1<ArrayList<T>>>{
[...]
public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition, Collector<Tuple1<ArrayList<T>>> out) throws Exception {
[...]
out.collect(new Tuple1<ArrayList<T>>(tuples));
(The hint to the not valid PoJo types still remains)

But that isn't my preferred format for the DataSink...
If I uncomment the MR_GPSRS_Reducer (the last transformation), the Flink Minicluster also starts.


Has anybody an idea, how can I teach Flink to execute my program with T's as DataSink? (In that case, T would be a Tuple10 with Doubles).
(I have already tried to explicitly typecast the datasets and transformations, so that the suppression of the warnings isn't necessary any more)

I'm using <flink.version>1.0.1</flink.version>

Thank you in advance
Robert


Reply | Threaded
Open this post in threaded view
|

Re: Lazy Evaluation

Aljoscha Krettek
Great to hear that it works now! :-)

On Sun, 19 Jun 2016 at 16:33 Paschek, Robert <[hidden email]> wrote:
Hi Mailing List,

after "upgrading" the flink version in my pom.xml to 1.0.3, i get two error messages for these output variants, which don't work:

org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(MR_GPSRS.java:69)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'OUT' in 'class org.apache.flink.api.common.functions.RichGroupReduceFunction' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s).


After adding adding ".returns(input.getType())" to my transformation, everything works great now : - )
Many thanks to these developers, who added this messages in the last versions!

Best,
Robert
________________________________________
Von: Paschek, Robert <[hidden email]>
Gesendet: Dienstag, 14. Juni 2016 19:52
An: [hidden email]
Betreff: Lazy Evaluation

Hi Mailing List,

I probably have a problem with the Lazy Evaluation. Depending of the “return” Datatype of my last Transformation (GroupReduce), the integrated Flink Mini Clusters does not start. I have done the following:

// Configuration
Configuration parameters = new Configuration();
parameters.setString("path", "generated_2000000_tuples_10_dimensions_100.0_mean_25.0_std_and_498762467_seed.csv");
parameters.setString("output", "result_MR_GPSRS.csv");
parameters.setInteger("dimensionality", 10);
parameters.setInteger("cardinality", 2000000);
parameters.setDouble("min", 0.0);
parameters.setDouble("max", 200.0);

// Setting Up Execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// Reading CSV
DataSet<?> input = InputConfig.setDataSetFromCSV(parameters.getInteger("dimensionality", 2), env, parameters.getString("path", ""));


//Broadcast BitString, Cardinality, Dimensionality, PPD
@SuppressWarnings({ "unchecked", "rawtypes" })
DataSet<Tuple4<BitSet,Integer,Integer,Integer>> metaData =
                                input
                                .mapPartition(new MR_GP_BitStringGeneratorMapper())
                                .reduceGroup(new MR_GP_BitStringGeneratorReducer());

// Calculate result
@SuppressWarnings({ "unchecked", "rawtypes" })
DataSet<?> result = input
                                .mapPartition(new MR_GPSRS_Mapper()).withBroadcastSet(metaData, "MetaData").withParameters(parameters)
                                .reduceGroup(new MR_GPSRS_Reducer()).withBroadcastSet(metaData, "MetaData").withParameters(parameters);


try {
        result.writeAsCsv(parameters.getString("output", ""), FileSystem.WriteMode.OVERWRITE);
        JobExecutionResult job = env.execute();
        System.out.println("Runtime in seconds: "+(job.getNetRuntime()/1000));
        } catch (Exception e) {
        // TODO Auto-generated catch block
}



When I run my program with the integrated Flink mini cluster in eclipse, the console outputs only the following:
19:00:12,978 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class java.util.BitSet is not a valid POJO type
19:00:13,010 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class java.util.BitSet is not a valid POJO type
19:00:13,017 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class java.util.ArrayList is not a valid POJO type
19:00:13,021 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class java.util.ArrayList is not a valid POJO type

The MR_GPSRS_Reducer looks like the following:
public class MR_GPSRS_Reducer <T extends Tuple> extends RichGroupReduceFunction<ArrayList<ArrayList<T>>, T>
[...]
@Override
public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition, Collector<T> out) throws Exception {
[...]
for (T tuple : tuples) {
    out.collect(tuple);
}

If i change my code of MR_GPSRS_Reducer to fit the following, it has still the same behavior:
public class MR_GPSRS_Reducer <T extends Tuple> extends RichGroupReduceFunction<ArrayList<ArrayList<T>>, Tuple1<T>>{
[...]
@Override
public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition, Collector<Tuple1<T>> out) throws Exception {
[...]
for (T tuple : tuples) {
    out.collect(new Tuple1<T>(tuple));
}

Same as here:
public class MR_GPSRS_Reducer <T extends Tuple> extends RichGroupReduceFunction<ArrayList<ArrayList<T>>, ArrayList<T>>{
[...]
public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition, Collector<ArrayList<T>> out) throws Exception {
[...]
out.collect(tuples);

Only if i change the MR_GPSRS_Reducer to the following, the Flink Mini Cluster starts:
public class MR_GPSRS_Reducer <T extends Tuple> extends RichGroupReduceFunction<ArrayList<ArrayList<T>>, Tuple1<ArrayList<T>>>{
[...]
public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition, Collector<Tuple1<ArrayList<T>>> out) throws Exception {
[...]
out.collect(new Tuple1<ArrayList<T>>(tuples));
(The hint to the not valid PoJo types still remains)

But that isn't my preferred format for the DataSink...
If I uncomment the MR_GPSRS_Reducer (the last transformation), the Flink Minicluster also starts.


Has anybody an idea, how can I teach Flink to execute my program with T's as DataSink? (In that case, T would be a Tuple10 with Doubles).
(I have already tried to explicitly typecast the datasets and transformations, so that the suppression of the warnings isn't necessary any more)

I'm using <flink.version>1.0.1</flink.version>

Thank you in advance
Robert