Hello Fabian,Thanks, your solution works indeed. however, i don't understand why.When i replace the lambda by an explicit functiondef mapfunc2(pair: Tuple2[BSONWritable, BSONWritable]) : String = {return pair._1.toString}input.map mapfunc2i get the error below, which seemingly indicates that my method call maps both to the scala version (first overloaded method) and the java version (which works with a MapFunction, second one in the error message)this was also the error i got when doing the following (which looks the most logical to me)def mapfunc(t1: BSONWritable, t2: BSONWritable): String = {return t1.toString()}input.map mapfuncit would seem logical to me to decompose the pair as 2 separate arguments (which is what the java version of the example also does atand this is the error message:both method map in class DataSet of type [R](fun: ((com.mongodb.hadoop.io.BSONWritable, com.mongodb.hadoop.io. BSONWritable)) => R)(implicit evidence$4: org.apache.flink.api.common. typeinfo.TypeInformation[R], implicit evidence$5: scala.reflect.ClassTag[R])org. apache.flink.api.scala. DataSet[R] and method map in class DataSet of type [R](mapper: org.apache.flink.api.common.functions.MapFunction[(com. mongodb.hadoop.io. BSONWritable, com.mongodb.hadoop.io. BSONWritable),R])(implicit evidence$2: org.apache.flink.api.common. typeinfo.TypeInformation[R], implicit evidence$3: scala.reflect.ClassTag[R])org. apache.flink.api.scala. DataSet[R] match expected type ?Thanks!FrankOn Thu, Sep 8, 2016 at 6:56 PM, Fabian Hueske <[hidden email]> wrote:Alternatively you can also add an import org.apache.flink.api.scala.extPair is a Tuple2[BSONWritable, BSONWritable], and pair._1 accesses the key of the pair.input.map( pair => pair._1.toString )Something like this should work:Hi Frank,input should be of DataSet[(BSONWritable, BSONWritable)], so a Tuple2[BSONWritable, BSONWritable], right?ensions._ and then you can do
input.mapWith { case (x, y) => x }Best, Fabian2016-09-08 18:30 GMT+02:00 Frank Dekervel <[hidden email]>:Hello,i'm new to flink, and i'm trying to get a mongodb hadoop input format working in scala. However, i get lost in the scala generics system ... could somebody help me ?Code is below, neither version works (compile error at the "map" call), either because of method not applicable either because of ambiguous reference to overloaded method map (flink 1.0.3)Thanks alreadygreetings,Frankimport org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.scala.DataSet; import org.apache.flink.api.scala.ExecutionEnvironment; import org.apache.flink.api.scala.hadoop.mapred.HadoopInputFormat; import org.apache.hadoop.mapred.JobConf; import org.bson.BSONObject;import com.mongodb.BasicDBObject;import com.mongodb.hadoop.io.BSONWritable; import com.mongodb.hadoop.mapred.MongoInputFormat; val hdIf = new HadoopInputFormat(new MongoInputFormat(),classOf[BSONWritable], classOf[BSONWritable], new JobConf()) hdIf.getJobConf().set("mongo.input.uri", "mongodb://localhost:27017/handling.event"); val input = env.createInput(hdIf);def mapfunc(t1: BSONWritable, t2: BSONWritable): String = {return t1.toString()}// does not work//input.map mapfunc// does not work eitherinput.map( (t1: BSONWritable, t2: BSONWritable) => t1 )// does not work either//input.map ( (t1, t2) => t1 )
Free forum by Nabble | Edit this page |