Hello,
i'm new to flink, and i'm trying to get a mongodb hadoop input format working in scala. However, i get lost in the scala generics system ... could somebody help me ? Code is below, neither version works (compile error at the "map" call), either because of method not applicable either because of ambiguous reference to overloaded method map (flink 1.0.3) Thanks already greetings, Frank import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.scala.DataSet; import org.apache.flink.api.scala.ExecutionEnvironment; import org.apache.flink.api.scala.hadoop.mapred.HadoopInputFormat; import org.apache.hadoop.mapred.JobConf; import org.bson.BSONObject; import com.mongodb.BasicDBObject; import com.mongodb.hadoop.io.BSONWritable; import com.mongodb.hadoop.mapred.MongoInputFormat; val hdIf = new HadoopInputFormat(new MongoInputFormat(),classOf[BSONWritable], classOf[BSONWritable], new JobConf()) hdIf.getJobConf().set("mongo.input.uri", "mongodb://localhost:27017/handling.event"); val input = env.createInput(hdIf); def mapfunc(t1: BSONWritable, t2: BSONWritable): String = { return t1.toString() } // does not work //input.map mapfunc // does not work either input.map( (t1: BSONWritable, t2: BSONWritable) => t1 ) // does not work either //input.map ( (t1, t2) => t1 ) |
Hi Frank, input should be of DataSet[(BSONWritable, BSONWritable)], so a Tuple2[BSONWritable, BSONWritable], right?and then you can do input.mapWith { case (x, y) => x } Best, Fabian 2016-09-08 18:30 GMT+02:00 Frank Dekervel <[hidden email]>:
|
Hello Fabian, Thanks, your solution works indeed. however, i don't understand why. When i replace the lambda by an explicit function def mapfunc2(pair: Tuple2[BSONWritable, BSONWritable]) : String = { return pair._1.toString } input.map mapfunc2 i get the error below, which seemingly indicates that my method call maps both to the scala version (first overloaded method) and the java version (which works with a MapFunction, second one in the error message) this was also the error i got when doing the following (which looks the most logical to me) def mapfunc(t1: BSONWritable, t2: BSONWritable): String = { return t1.toString() } input.map mapfunc it would seem logical to me to decompose the pair as 2 separate arguments (which is what the java version of the example also does at and this is the error message: both method map in class DataSet of type [R](fun: ((com.mongodb.hadoop.io.BSONWritable, com.mongodb.hadoop.io.BSONWritable)) => R)(implicit evidence$4: org.apache.flink.api.common.typeinfo.TypeInformation[R], implicit evidence$5: scala.reflect.ClassTag[R])org.apache.flink.api.scala.DataSet[R] and method map in class DataSet of type [R](mapper: org.apache.flink.api.common.functions.MapFunction[(com.mongodb.hadoop.io.BSONWritable, com.mongodb.hadoop.io.BSONWritable),R])(implicit evidence$2: org.apache.flink.api.common.typeinfo.TypeInformation[R], implicit evidence$3: scala.reflect.ClassTag[R])org.apache.flink.api.scala.DataSet[R] match expected type ? Thanks! Frank On Thu, Sep 8, 2016 at 6:56 PM, Fabian Hueske <[hidden email]> wrote:
|
Hi Frank, I didn't tried to run the code, but this does not show a compiler error in IntelliJ: > input.map( mapfunc2 _ ) Decomposing the Tuple2 into two separate arguments does only work with Scala's pattern matching technique (this is the second approach I posted). 2016-09-08 20:54 GMT+02:00 Frank Dekervel <[hidden email]>:
|
Hi there, treading in the thread,
do you know how to add authentication options to mongo here? I'm trying to do hdIf.getJobConf.set("user", s"$USER") hdIf.getJobConf.set("password", s"$PWD") but I can't find any documentation to support it. Any pointers? Many thanks, Alex |
Free forum by Nabble | Edit this page |