Hi everyone,
first up, I'm new to Scala, so please bear with me, but I could not find any solution on the web or the Flink documentation. I'm having trouble converting a DataSet[(LongWritable, Text)] to a DataSet of a custom case class. I got it to work, however in a way that I feel is too verbose for Scala: import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.scala._ import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int) val rankingsInput: DataSet[Ranking] = env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text], rankingsInputPath, job).map[Ranking](new MapFunction[(LongWritable, Text), Ranking] { override def map(value: (LongWritable, Text)) = { val splits = value._2.toString.split(",") new Ranking(splits(0), splits(1).toInt, splits(2).toInt) } }) Is there a simpler way of doing this? All other variants I've tried yield some type information errors. Thanks in advance! Robert My GPG Key ID: 336E2680 |
The only real noise I see is the usage of a MapFunction, which can be rewritten like this in Scala: case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int) val rankingsInput: DataSet[Ranking] = env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text], rankingsInputPath, job).map[Ranking] { (value: (LongWritable, Text)) = { val Array(name, n, m) = value._2.toString.split(",") Ranking(name, n.toInt, m.toInt) // no new needed for case classes } }) As you may have noticed, I've also destructured the tuple in the first line. Another way to do this destructuring in a more concise way is to use an API extension [1] (which won't be available before 1.1, I suppose). Since you're parsing textual date, it could also possibly make sense to handle error conditions for malformed inputs; here is an example that uses flatMap to do so: import scala.util.{Try, Success, Failure} // needed to work with the "functional" Try case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int) val rankingsInput: DataSet[Ranking] = env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text], rankingsInputPath, job).flatMap[Ranking] { (value: (LongWritable, Text), out: Collector[Ranking]) = { Try { val Array(name, n, m) = value._2.toString.split(",") // exception thrown if array size != 3 Ranking(name, n.toInt, m.toInt) // exception thrown if n or m are not numbers } match { case Success(ranking) => ranking case Failure(exception) => // deal with malformed input, perhaps log } } }) Feel free to ask me for any kind of clarifications on the snippets [2] I posted, I'll gladly help you further if you need it. Last note: I'm not a user but I believe Shapeless has some very handy constructs to move back and forth between tuples and case classes (but please take this with a grain of salt). [2]: I didn't test them, so caution is advisable ;) On Wed, May 4, 2016 at 2:00 PM, Robert Schmidtke <[hidden email]> wrote:
BR, Stefano Baghino |
Thanks Stefano! I guess you're right, it's probably not too bad except the MapFunction, which I have swapped with your suggestion now. I was just a bit confused by the fact that I had to state so many types, where I thought they could be inferred automatically. I tried variations of the "non-explicit" MapFunction, but I must have messed up something. The Array matching is pretty handy as well. I'm good to go now, all works well and looks a bit more Scala-y now :) Robert On Wed, May 4, 2016 at 3:42 PM, Stefano Baghino <[hidden email]> wrote:
My GPG Key ID: 336E2680 |
I just noticed my snippets contains a whole lot of errors, but I'm glad it's been helpful. :) On Wed, May 4, 2016 at 3:59 PM, Robert Schmidtke <[hidden email]> wrote:
BR, Stefano Baghino |
Free forum by Nabble | Edit this page |