scala version of flink mongodb example

Posted by Frank Dekervel on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/scala-version-of-flink-mongodb-example-tp8971.html

Hello,

i'm new to flink, and i'm trying to get a mongodb hadoop input format working in scala. However, i get lost in the scala generics system ... could somebody help me ?

Code is below, neither version works (compile error at the "map" call), either because of method not applicable either because of ambiguous reference to overloaded method map (flink 1.0.3)

Thanks already
greetings,
Frank


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.scala.DataSet;
import org.apache.flink.api.scala.ExecutionEnvironment;
import org.apache.flink.api.scala.hadoop.mapred.HadoopInputFormat;

import org.apache.hadoop.mapred.JobConf;
import org.bson.BSONObject;

import com.mongodb.BasicDBObject;
import com.mongodb.hadoop.io.BSONWritable;
import com.mongodb.hadoop.mapred.MongoInputFormat;

val hdIf = new HadoopInputFormat(new MongoInputFormat(),classOf[BSONWritable], classOf[BSONWritable], new JobConf())

hdIf.getJobConf().set("mongo.input.uri", 
    "mongodb://localhost:27017/handling.event");

val input = env.createInput(hdIf);

def mapfunc(t1: BSONWritable, t2: BSONWritable): String  = {
    return t1.toString()
}

// does not work
//input.map mapfunc

// does not work either 
input.map( (t1: BSONWritable, t2: BSONWritable) => t1 )
// does not work either
//input.map ( (t1, t2) => t1 )