scala version of flink mongodb example

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

scala version of flink mongodb example

Frank Dekervel
Hello,

i'm new to flink, and i'm trying to get a mongodb hadoop input format working in scala. However, i get lost in the scala generics system ... could somebody help me ?

Code is below, neither version works (compile error at the "map" call), either because of method not applicable either because of ambiguous reference to overloaded method map (flink 1.0.3)

Thanks already
greetings,
Frank


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.scala.DataSet;
import org.apache.flink.api.scala.ExecutionEnvironment;
import org.apache.flink.api.scala.hadoop.mapred.HadoopInputFormat;

import org.apache.hadoop.mapred.JobConf;
import org.bson.BSONObject;

import com.mongodb.BasicDBObject;
import com.mongodb.hadoop.io.BSONWritable;
import com.mongodb.hadoop.mapred.MongoInputFormat;

val hdIf = new HadoopInputFormat(new MongoInputFormat(),classOf[BSONWritable], classOf[BSONWritable], new JobConf())

hdIf.getJobConf().set("mongo.input.uri", 
    "mongodb://localhost:27017/handling.event");

val input = env.createInput(hdIf);

def mapfunc(t1: BSONWritable, t2: BSONWritable): String  = {
    return t1.toString()
}

// does not work
//input.map mapfunc

// does not work either 
input.map( (t1: BSONWritable, t2: BSONWritable) => t1 )
// does not work either
//input.map ( (t1, t2) => t1 )

Reply | Threaded
Open this post in threaded view
|

Re: scala version of flink mongodb example

Fabian Hueske-2
Hi Frank,

input should be of DataSet[(BSONWritable, BSONWritable)], so a Tuple2[BSONWritable, BSONWritable], right?

Something like this should work:

input.map( pair => pair._1.toString )

Pair is a Tuple2[BSONWritable, BSONWritable], and pair._1 accesses the key of the pair.

Alternatively you can also add an import org.apache.flink.api.scala.extensions._

and then you can do

input.mapWith { case (x, y) => x }

Best, Fabian


2016-09-08 18:30 GMT+02:00 Frank Dekervel <[hidden email]>:
Hello,

i'm new to flink, and i'm trying to get a mongodb hadoop input format working in scala. However, i get lost in the scala generics system ... could somebody help me ?

Code is below, neither version works (compile error at the "map" call), either because of method not applicable either because of ambiguous reference to overloaded method map (flink 1.0.3)

Thanks already
greetings,
Frank


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.scala.DataSet;
import org.apache.flink.api.scala.ExecutionEnvironment;
import org.apache.flink.api.scala.hadoop.mapred.HadoopInputFormat;

import org.apache.hadoop.mapred.JobConf;
import org.bson.BSONObject;

import com.mongodb.BasicDBObject;
import com.mongodb.hadoop.io.BSONWritable;
import com.mongodb.hadoop.mapred.MongoInputFormat;

val hdIf = new HadoopInputFormat(new MongoInputFormat(),classOf[BSONWritable], classOf[BSONWritable], new JobConf())

hdIf.getJobConf().set("mongo.input.uri", 
    "mongodb://localhost:27017/handling.event");

val input = env.createInput(hdIf);

def mapfunc(t1: BSONWritable, t2: BSONWritable): String  = {
    return t1.toString()
}

// does not work
//input.map mapfunc

// does not work either 
input.map( (t1: BSONWritable, t2: BSONWritable) => t1 )
// does not work either
//input.map ( (t1, t2) => t1 )


Reply | Threaded
Open this post in threaded view
|

Re: scala version of flink mongodb example

Frank Dekervel
Hello Fabian,

Thanks, your solution works indeed. however, i don't understand why.
When i replace the lambda by an explicit function

def mapfunc2(pair: Tuple2[BSONWritable, BSONWritable]) : String = {
    return pair._1.toString
}
input.map mapfunc2


i get the error below, which seemingly indicates that my method call maps both to the scala version (first overloaded method) and the java version (which works with a MapFunction, second one in the error message)

this was also the error i got when doing the following (which looks the most logical to me)

def mapfunc(t1: BSONWritable, t2: BSONWritable): String  = {
    return t1.toString()
}
input.map mapfunc

it would seem logical to me to decompose the pair as 2 separate arguments (which is what the java version of the example also does at 

and this is the error message:

both method map in class DataSet of type [R](fun: ((com.mongodb.hadoop.io.BSONWritable, com.mongodb.hadoop.io.BSONWritable)) => R)(implicit evidence$4: org.apache.flink.api.common.typeinfo.TypeInformation[R], implicit evidence$5: scala.reflect.ClassTag[R])org.apache.flink.api.scala.DataSet[R]
and method map in class DataSet of type [R](mapper: org.apache.flink.api.common.functions.MapFunction[(com.mongodb.hadoop.io.BSONWritable, com.mongodb.hadoop.io.BSONWritable),R])(implicit evidence$2: org.apache.flink.api.common.typeinfo.TypeInformation[R], implicit evidence$3: scala.reflect.ClassTag[R])org.apache.flink.api.scala.DataSet[R]
match expected type ?

Thanks!
Frank


On Thu, Sep 8, 2016 at 6:56 PM, Fabian Hueske <[hidden email]> wrote:
Hi Frank,

input should be of DataSet[(BSONWritable, BSONWritable)], so a Tuple2[BSONWritable, BSONWritable], right?

Something like this should work:

input.map( pair => pair._1.toString )

Pair is a Tuple2[BSONWritable, BSONWritable], and pair._1 accesses the key of the pair.

Alternatively you can also add an import org.apache.flink.api.scala.extensions._

and then you can do

input.mapWith { case (x, y) => x }

Best, Fabian


2016-09-08 18:30 GMT+02:00 Frank Dekervel <[hidden email]>:
Hello,

i'm new to flink, and i'm trying to get a mongodb hadoop input format working in scala. However, i get lost in the scala generics system ... could somebody help me ?

Code is below, neither version works (compile error at the "map" call), either because of method not applicable either because of ambiguous reference to overloaded method map (flink 1.0.3)

Thanks already
greetings,
Frank


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.scala.DataSet;
import org.apache.flink.api.scala.ExecutionEnvironment;
import org.apache.flink.api.scala.hadoop.mapred.HadoopInputFormat;

import org.apache.hadoop.mapred.JobConf;
import org.bson.BSONObject;

import com.mongodb.BasicDBObject;
import com.mongodb.hadoop.io.BSONWritable;
import com.mongodb.hadoop.mapred.MongoInputFormat;

val hdIf = new HadoopInputFormat(new MongoInputFormat(),classOf[BSONWritable], classOf[BSONWritable], new JobConf())

hdIf.getJobConf().set("mongo.input.uri", 
    "mongodb://localhost:27017/handling.event");

val input = env.createInput(hdIf);

def mapfunc(t1: BSONWritable, t2: BSONWritable): String  = {
    return t1.toString()
}

// does not work
//input.map mapfunc

// does not work either 
input.map( (t1: BSONWritable, t2: BSONWritable) => t1 )
// does not work either
//input.map ( (t1, t2) => t1 )



Reply | Threaded
Open this post in threaded view
|

Re: scala version of flink mongodb example

Fabian Hueske-2
Hi Frank,

I didn't tried to run the code, but this does not show a compiler error in IntelliJ:

> input.map( mapfunc2 _ )

Decomposing the Tuple2 into two separate arguments does only work with Scala's pattern matching technique (this is the second approach I posted).
The Java API is not capable of splitting the fields of a Tuple2 argument into separate arguments.

Best, Fabian


2016-09-08 20:54 GMT+02:00 Frank Dekervel <[hidden email]>:
Hello Fabian,

Thanks, your solution works indeed. however, i don't understand why.
When i replace the lambda by an explicit function

def mapfunc2(pair: Tuple2[BSONWritable, BSONWritable]) : String = {
    return pair._1.toString
}
input.map mapfunc2


i get the error below, which seemingly indicates that my method call maps both to the scala version (first overloaded method) and the java version (which works with a MapFunction, second one in the error message)

this was also the error i got when doing the following (which looks the most logical to me)

def mapfunc(t1: BSONWritable, t2: BSONWritable): String  = {
    return t1.toString()
}
input.map mapfunc

it would seem logical to me to decompose the pair as 2 separate arguments (which is what the java version of the example also does at 

and this is the error message:

both method map in class DataSet of type [R](fun: ((com.mongodb.hadoop.io.BSONWritable, com.mongodb.hadoop.io.BSONWritable)) => R)(implicit evidence$4: org.apache.flink.api.common.typeinfo.TypeInformation[R], implicit evidence$5: scala.reflect.ClassTag[R])org.apache.flink.api.scala.DataSet[R]
and method map in class DataSet of type [R](mapper: org.apache.flink.api.common.functions.MapFunction[(com.mongodb.hadoop.io.BSONWritable, com.mongodb.hadoop.io.BSONWritable),R])(implicit evidence$2: org.apache.flink.api.common.typeinfo.TypeInformation[R], implicit evidence$3: scala.reflect.ClassTag[R])org.apache.flink.api.scala.DataSet[R]
match expected type ?

Thanks!
Frank


On Thu, Sep 8, 2016 at 6:56 PM, Fabian Hueske <[hidden email]> wrote:
Hi Frank,

input should be of DataSet[(BSONWritable, BSONWritable)], so a Tuple2[BSONWritable, BSONWritable], right?

Something like this should work:

input.map( pair => pair._1.toString )

Pair is a Tuple2[BSONWritable, BSONWritable], and pair._1 accesses the key of the pair.

Alternatively you can also add an import org.apache.flink.api.scala.extensions._

and then you can do

input.mapWith { case (x, y) => x }

Best, Fabian


2016-09-08 18:30 GMT+02:00 Frank Dekervel <[hidden email]>:
Hello,

i'm new to flink, and i'm trying to get a mongodb hadoop input format working in scala. However, i get lost in the scala generics system ... could somebody help me ?

Code is below, neither version works (compile error at the "map" call), either because of method not applicable either because of ambiguous reference to overloaded method map (flink 1.0.3)

Thanks already
greetings,
Frank


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.scala.DataSet;
import org.apache.flink.api.scala.ExecutionEnvironment;
import org.apache.flink.api.scala.hadoop.mapred.HadoopInputFormat;

import org.apache.hadoop.mapred.JobConf;
import org.bson.BSONObject;

import com.mongodb.BasicDBObject;
import com.mongodb.hadoop.io.BSONWritable;
import com.mongodb.hadoop.mapred.MongoInputFormat;

val hdIf = new HadoopInputFormat(new MongoInputFormat(),classOf[BSONWritable], classOf[BSONWritable], new JobConf())

hdIf.getJobConf().set("mongo.input.uri", 
    "mongodb://localhost:27017/handling.event");

val input = env.createInput(hdIf);

def mapfunc(t1: BSONWritable, t2: BSONWritable): String  = {
    return t1.toString()
}

// does not work
//input.map mapfunc

// does not work either 
input.map( (t1: BSONWritable, t2: BSONWritable) => t1 )
// does not work either
//input.map ( (t1, t2) => t1 )




Reply | Threaded
Open this post in threaded view
|

Re: scala version of flink mongodb example

alex.decastro
Hi there, treading in the thread,

do you know how to add authentication options to mongo here?

I'm trying to do

hdIf.getJobConf.set("user", s"$USER")
hdIf.getJobConf.set("password", s"$PWD")

but I can't find any documentation to support it.

Any pointers?

Many thanks,
Alex