DataSet/DataStream of scala type class interface

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

DataSet/DataStream of scala type class interface

Salva Alcántara
I am just experimenting with the usage of Scala type classes within Flink. I
have defined the following type class interface:

```scala
trait LikeEvent[T] {
        def timestamp(payload: T): Int
}
```

Now, I want to consider a `DataSet` of `LikeEvent[_]` like this:

```scala
// existing classes that needs to be adapted/normalized (without touching
them)
case class Log(ts: Int, severity: Int, message: String)
case class Metric(timestamp: Int, name: String, value: Double)

// create instances for the raw events
object EventInstance {

        implicit val logEvent = new LikeEvent[Log] {
                def timestamp(log: Log): Int = log.ts
        }

        implicit val metricEvent = new LikeEvent[Metric] {
                def timestamp(metric: Metric): Int = metric.ts
        }
}

// add ops to the raw event classes (regular class)
object EventSyntax {

        implicit class Event[T: LikeEvent](val payload: T) {
                val le = implicitly[LikeEvent[T]]
                def timestamp: Int = le.timestamp(payload)
        }
}
```

The following app runs just fine:

```scala
// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment

// underlying (raw) events
val events: DataSet[Event[_]] = env.fromElements(
  Metric(1586736000, "cpu_usage", 0.2),
  Log(1586736005, 1, "invalid login"),
  Log(1586736010, 1, "invalid login"),
  Log(1586736015, 1, "invalid login"),
  Log(1586736030, 2, "valid login"),
  Metric(1586736060, "cpu_usage", 0.8),
  Log(1586736120, 0, "end of world"),
)

// count events per hour
val eventsPerHour = events
  .map(new GetMinuteEventTuple())
  .groupBy(0).reduceGroup { g =>
    val gl = g.toList
    val (hour, count) = (gl.head._1, gl.size)
    (hour, count)
  }

eventsPerHour.print()
```

Printing the expected output

```
(0,5)
(1,1)
(2,1)
```

However, if I modify the syntax object like this:

```
// couldn't make it work with Flink!
// add ops to the raw event classes (case class)
object EventSyntax2 {

  case class Event[T: LikeEvent](payload: T) {
    val le = implicitly[LikeEvent[T]]
    def timestamp: Int = le.timestamp(payload)
  }

  implicit def fromPayload[T: LikeEvent](payload: T): Event[T] =
Event(payload)  
}
```

I get this the following error:


```
type mismatch;
found   : org.apache.flink.api.scala.DataSet[Product with Serializable]
required:
org.apache.flink.api.scala.DataSet[com.salvalcantara.fp.EventSyntax2.Event[_]]
```

So, guided by the message, I do the following change:

```
val events: DataSet[Event[_]] = env.fromElements[Event[_]](...)
```

After that, the error changes to:

```
could not find implicit value for evidence parameter of type
org.apache.flink.api.common.typeinfo.TypeInformation[com.salvalcantara.fp.EventSyntax2.Event[_]]
```

I cannot understand why `EventSyntax2` results into these errors, whereas
`EventSyntax` compiles and runs well. Why is using a case class wrapper in
`EventSyntax2` more problematic than using a regular class as in
`EventSyntax`?

Anyway, my question is twofold:

- How can I solve my problem with `EventSyntax2`
- How would be the simplest way to achieve my goals? Here, I am just
experimenting with the type class pattern for the sake of learning, but
definitively a more object Oriented approach (based on subtyping) looks
simpler to me. Something like this:

```
// Define trait
trait Event {
        def timestamp: Int
        def payload: Product with Serializable
}

// Metric adapter (similar for Log)
object MetricAdapter {

        implicit class MetricEvent(val payload: Metric) extends Event {
                def timestamp: Int = payload.timestamp
        }
}
```

And then simply use `val events: DataSet[Event] = env.fromElements(...)` in
the main.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: DataSet/DataStream of scala type class interface

Salva Alcántara