I am just experimenting with the usage of Scala type classes within Flink. I
have defined the following type class interface: ```scala trait LikeEvent[T] { def timestamp(payload: T): Int } ``` Now, I want to consider a `DataSet` of `LikeEvent[_]` like this: ```scala // existing classes that needs to be adapted/normalized (without touching them) case class Log(ts: Int, severity: Int, message: String) case class Metric(timestamp: Int, name: String, value: Double) // create instances for the raw events object EventInstance { implicit val logEvent = new LikeEvent[Log] { def timestamp(log: Log): Int = log.ts } implicit val metricEvent = new LikeEvent[Metric] { def timestamp(metric: Metric): Int = metric.ts } } // add ops to the raw event classes (regular class) object EventSyntax { implicit class Event[T: LikeEvent](val payload: T) { val le = implicitly[LikeEvent[T]] def timestamp: Int = le.timestamp(payload) } } ``` The following app runs just fine: ```scala // set up the execution environment val env = ExecutionEnvironment.getExecutionEnvironment // underlying (raw) events val events: DataSet[Event[_]] = env.fromElements( Metric(1586736000, "cpu_usage", 0.2), Log(1586736005, 1, "invalid login"), Log(1586736010, 1, "invalid login"), Log(1586736015, 1, "invalid login"), Log(1586736030, 2, "valid login"), Metric(1586736060, "cpu_usage", 0.8), Log(1586736120, 0, "end of world"), ) // count events per hour val eventsPerHour = events .map(new GetMinuteEventTuple()) .groupBy(0).reduceGroup { g => val gl = g.toList val (hour, count) = (gl.head._1, gl.size) (hour, count) } eventsPerHour.print() ``` Printing the expected output ``` (0,5) (1,1) (2,1) ``` However, if I modify the syntax object like this: ``` // couldn't make it work with Flink! // add ops to the raw event classes (case class) object EventSyntax2 { case class Event[T: LikeEvent](payload: T) { val le = implicitly[LikeEvent[T]] def timestamp: Int = le.timestamp(payload) } implicit def fromPayload[T: LikeEvent](payload: T): Event[T] = Event(payload) } ``` I get this the following error: ``` type mismatch; found : org.apache.flink.api.scala.DataSet[Product with Serializable] required: org.apache.flink.api.scala.DataSet[com.salvalcantara.fp.EventSyntax2.Event[_]] ``` So, guided by the message, I do the following change: ``` val events: DataSet[Event[_]] = env.fromElements[Event[_]](...) ``` After that, the error changes to: ``` could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[com.salvalcantara.fp.EventSyntax2.Event[_]] ``` I cannot understand why `EventSyntax2` results into these errors, whereas `EventSyntax` compiles and runs well. Why is using a case class wrapper in `EventSyntax2` more problematic than using a regular class as in `EventSyntax`? Anyway, my question is twofold: - How can I solve my problem with `EventSyntax2` - How would be the simplest way to achieve my goals? Here, I am just experimenting with the type class pattern for the sake of learning, but definitively a more object Oriented approach (based on subtyping) looks simpler to me. Something like this: ``` // Define trait trait Event { def timestamp: Int def payload: Product with Serializable } // Metric adapter (similar for Log) object MetricAdapter { implicit class MetricEvent(val payload: Metric) extends Event { def timestamp: Int = payload.timestamp } } ``` And then simply use `val events: DataSet[Event] = env.fromElements(...)` in the main. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
FYI, I have posted the same question (a bit more polished) in
https://stackoverflow.com/questions/61193662/dataset-datastream-of-type-class-interface Also, you can find the code in this repo: https://github.com/salvalcantara/flink-events-and-polymorphism -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |