Map from Tuple to Case Class

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Map from Tuple to Case Class

Robert Schmidtke
Hi everyone,

first up, I'm new to Scala, so please bear with me, but I could not find any solution on the web or the Flink documentation. I'm having trouble converting a DataSet[(LongWritable, Text)] to a DataSet of a custom case class. I got it to work, however in a way that I feel is too verbose for Scala:


import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.scala._

import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text

case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int)
val rankingsInput: DataSet[Ranking] =
  env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text], rankingsInputPath, job).map[Ranking](new MapFunction[(LongWritable, Text), Ranking] {
  override def map(value: (LongWritable, Text)) = {
      val splits = value._2.toString.split(",")
      new Ranking(splits(0), splits(1).toInt, splits(2).toInt)
    }
  })


Is there a simpler way of doing this? All other variants I've tried yield some type information errors.

Thanks in advance!
Robert

--
My GPG Key ID: 336E2680
Reply | Threaded
Open this post in threaded view
|

Re: Map from Tuple to Case Class

stefanobaghino
The only real noise I see is the usage of a MapFunction, which can be rewritten like this in Scala:

case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int)
val rankingsInput: DataSet[Ranking] =
  env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text], rankingsInputPath, job).map[Ranking] {
    (value: (LongWritable, Text)) = {
      val Array(name, n, m) = value._2.toString.split(",")
      Ranking(name, n.toInt, m.toInt) // no new needed for case classes
    }
  })

As you may have noticed, I've also destructured the tuple in the first line. Another way to do this destructuring in a more concise way is to use an API extension [1] (which won't be available before 1.1, I suppose).

Since you're parsing textual date, it could also possibly make sense to handle error conditions for malformed inputs; here is an example that uses flatMap to do so:

import scala.util.{Try, Success, Failure} // needed to work with the "functional" Try
case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int)
val rankingsInput: DataSet[Ranking] =
  env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text], rankingsInputPath, job).flatMap[Ranking] {
    (value: (LongWritable, Text), out: Collector[Ranking]) = {
      Try {
        val Array(name, n, m) = value._2.toString.split(",") // exception thrown if array size != 3
        Ranking(name, n.toInt, m.toInt) // exception thrown if n or m are not numbers
      } match {
        case Success(ranking) => ranking
        case Failure(exception) => // deal with malformed input, perhaps log
      }
    }
  })

Feel free to ask me for any kind of clarifications on the snippets [2] I posted, I'll gladly help you further if you need it.

Last note: I'm not a user but I believe Shapeless has some very handy constructs to move back and forth between tuples and case classes (but please take this with a grain of salt).

[2]: I didn't test them, so caution is advisable ;)

On Wed, May 4, 2016 at 2:00 PM, Robert Schmidtke <[hidden email]> wrote:
Hi everyone,

first up, I'm new to Scala, so please bear with me, but I could not find any solution on the web or the Flink documentation. I'm having trouble converting a DataSet[(LongWritable, Text)] to a DataSet of a custom case class. I got it to work, however in a way that I feel is too verbose for Scala:


import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.scala._

import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text

case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int)
val rankingsInput: DataSet[Ranking] =
  env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text], rankingsInputPath, job).map[Ranking](new MapFunction[(LongWritable, Text), Ranking] {
  override def map(value: (LongWritable, Text)) = {
      val splits = value._2.toString.split(",")
      new Ranking(splits(0), splits(1).toInt, splits(2).toInt)
    }
  })


Is there a simpler way of doing this? All other variants I've tried yield some type information errors.

Thanks in advance!
Robert

--
My GPG Key ID: 336E2680



--
BR,
Stefano Baghino

Software Engineer @ Radicalbit
Reply | Threaded
Open this post in threaded view
|

Re: Map from Tuple to Case Class

Robert Schmidtke
Thanks Stefano! I guess you're right, it's probably not too bad except the MapFunction, which I have swapped with your suggestion now. I was just a bit confused by the fact that I had to state so many types, where I thought they could be inferred automatically. I tried variations of the "non-explicit" MapFunction, but I must have messed up something. The Array matching is pretty handy as well. I'm good to go now, all works well and looks a bit more Scala-y now :)

Robert

On Wed, May 4, 2016 at 3:42 PM, Stefano Baghino <[hidden email]> wrote:
The only real noise I see is the usage of a MapFunction, which can be rewritten like this in Scala:

case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int)
val rankingsInput: DataSet[Ranking] =
  env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text], rankingsInputPath, job).map[Ranking] {
    (value: (LongWritable, Text)) = {
      val Array(name, n, m) = value._2.toString.split(",")
      Ranking(name, n.toInt, m.toInt) // no new needed for case classes
    }
  })

As you may have noticed, I've also destructured the tuple in the first line. Another way to do this destructuring in a more concise way is to use an API extension [1] (which won't be available before 1.1, I suppose).

Since you're parsing textual date, it could also possibly make sense to handle error conditions for malformed inputs; here is an example that uses flatMap to do so:

import scala.util.{Try, Success, Failure} // needed to work with the "functional" Try
case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int)
val rankingsInput: DataSet[Ranking] =
  env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text], rankingsInputPath, job).flatMap[Ranking] {
    (value: (LongWritable, Text), out: Collector[Ranking]) = {
      Try {
        val Array(name, n, m) = value._2.toString.split(",") // exception thrown if array size != 3
        Ranking(name, n.toInt, m.toInt) // exception thrown if n or m are not numbers
      } match {
        case Success(ranking) => ranking
        case Failure(exception) => // deal with malformed input, perhaps log
      }
    }
  })

Feel free to ask me for any kind of clarifications on the snippets [2] I posted, I'll gladly help you further if you need it.

Last note: I'm not a user but I believe Shapeless has some very handy constructs to move back and forth between tuples and case classes (but please take this with a grain of salt).

[2]: I didn't test them, so caution is advisable ;)

On Wed, May 4, 2016 at 2:00 PM, Robert Schmidtke <[hidden email]> wrote:
Hi everyone,

first up, I'm new to Scala, so please bear with me, but I could not find any solution on the web or the Flink documentation. I'm having trouble converting a DataSet[(LongWritable, Text)] to a DataSet of a custom case class. I got it to work, however in a way that I feel is too verbose for Scala:


import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.scala._

import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text

case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int)
val rankingsInput: DataSet[Ranking] =
  env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text], rankingsInputPath, job).map[Ranking](new MapFunction[(LongWritable, Text), Ranking] {
  override def map(value: (LongWritable, Text)) = {
      val splits = value._2.toString.split(",")
      new Ranking(splits(0), splits(1).toInt, splits(2).toInt)
    }
  })


Is there a simpler way of doing this? All other variants I've tried yield some type information errors.

Thanks in advance!
Robert

--
My GPG Key ID: 336E2680



--
BR,
Stefano Baghino

Software Engineer @ Radicalbit



--
My GPG Key ID: 336E2680
Reply | Threaded
Open this post in threaded view
|

Re: Map from Tuple to Case Class

stefanobaghino
I just noticed my snippets contains a whole lot of errors, but I'm glad it's been helpful. :)

On Wed, May 4, 2016 at 3:59 PM, Robert Schmidtke <[hidden email]> wrote:
Thanks Stefano! I guess you're right, it's probably not too bad except the MapFunction, which I have swapped with your suggestion now. I was just a bit confused by the fact that I had to state so many types, where I thought they could be inferred automatically. I tried variations of the "non-explicit" MapFunction, but I must have messed up something. The Array matching is pretty handy as well. I'm good to go now, all works well and looks a bit more Scala-y now :)

Robert

On Wed, May 4, 2016 at 3:42 PM, Stefano Baghino <[hidden email]> wrote:
The only real noise I see is the usage of a MapFunction, which can be rewritten like this in Scala:

case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int)
val rankingsInput: DataSet[Ranking] =
  env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text], rankingsInputPath, job).map[Ranking] {
    (value: (LongWritable, Text)) = {
      val Array(name, n, m) = value._2.toString.split(",")
      Ranking(name, n.toInt, m.toInt) // no new needed for case classes
    }
  })

As you may have noticed, I've also destructured the tuple in the first line. Another way to do this destructuring in a more concise way is to use an API extension [1] (which won't be available before 1.1, I suppose).

Since you're parsing textual date, it could also possibly make sense to handle error conditions for malformed inputs; here is an example that uses flatMap to do so:

import scala.util.{Try, Success, Failure} // needed to work with the "functional" Try
case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int)
val rankingsInput: DataSet[Ranking] =
  env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text], rankingsInputPath, job).flatMap[Ranking] {
    (value: (LongWritable, Text), out: Collector[Ranking]) = {
      Try {
        val Array(name, n, m) = value._2.toString.split(",") // exception thrown if array size != 3
        Ranking(name, n.toInt, m.toInt) // exception thrown if n or m are not numbers
      } match {
        case Success(ranking) => ranking
        case Failure(exception) => // deal with malformed input, perhaps log
      }
    }
  })

Feel free to ask me for any kind of clarifications on the snippets [2] I posted, I'll gladly help you further if you need it.

Last note: I'm not a user but I believe Shapeless has some very handy constructs to move back and forth between tuples and case classes (but please take this with a grain of salt).

[2]: I didn't test them, so caution is advisable ;)

On Wed, May 4, 2016 at 2:00 PM, Robert Schmidtke <[hidden email]> wrote:
Hi everyone,

first up, I'm new to Scala, so please bear with me, but I could not find any solution on the web or the Flink documentation. I'm having trouble converting a DataSet[(LongWritable, Text)] to a DataSet of a custom case class. I got it to work, however in a way that I feel is too verbose for Scala:


import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.scala._

import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text

case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int)
val rankingsInput: DataSet[Ranking] =
  env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text], rankingsInputPath, job).map[Ranking](new MapFunction[(LongWritable, Text), Ranking] {
  override def map(value: (LongWritable, Text)) = {
      val splits = value._2.toString.split(",")
      new Ranking(splits(0), splits(1).toInt, splits(2).toInt)
    }
  })


Is there a simpler way of doing this? All other variants I've tried yield some type information errors.

Thanks in advance!
Robert

--
My GPG Key ID: 336E2680



--
BR,
Stefano Baghino

Software Engineer @ Radicalbit



--
My GPG Key ID: 336E2680



--
BR,
Stefano Baghino

Software Engineer @ Radicalbit