Scala case classes with a generic parameter

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Scala case classes with a generic parameter

James Bucher
Hi,

I have been trying to get a case class with a generic parameter working with Filnk 1.0.3 and have been having some trouble. However when I compile I get the following error:
debug-type-bug/src/main/scala/com/example/flink/jobs/CaseClassWithGeneric.scala:40: error: could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[com.example.flink.jobs.CaseClassWithGeneric.TestGen[String]]
[ERROR]           .apply(new AggregateOrigins)

I am importing org.apache.flink.api.scala._ and the generic type is defined as [T: TypeInformation] as suggested here: https://ci.apache.org/projects/flink/flink-docs-master/internals/types_serialization.html 

The full code for the program is as follows:
package com.example.flink.jobs

import java.util.{Properties}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer08}
import org.apache.flink.api.scala._

object CaseClassWithGeneric {
case class TestGen[T: TypeInformation](item: T) {}

class AggregateOrigins extends WindowFunction[String, TestGen[String], String, TimeWindow] {
def apply(key: String, win: TimeWindow, values: Iterable[String], col: Collector[TestGen[String]]): Unit = {
values.foreach(x => { })
col.collect(new TestGen[String]("Foo"))
}
}

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties();
val messageStream = env.addSource(
new FlinkKafkaConsumer08("topic", new SimpleStringSchema, properties))
.keyBy(s => s)
.timeWindow(Time.days(1))
.apply(new AggregateOrigins)
messageStream.print()
env.execute("Simple Job")
}
}
When I dug into the apply() function definition I found the following:
def apply[R: TypeInformation](
function: WindowFunction[T, R, K, W]): DataStream[R] = {

val cleanFunction = clean(function)
val applyFunction = new ScalaWindowFunctionWrapper[T, R, K, W](cleanFunction)
asScalaStream(javaStream.apply(applyFunction, implicitly[TypeInformation[R]]))
}
As Far as I can tell TestGen[String] should correspond to [R: TypeInformation] in apply. Am I missing something or is it not possible to define case class with a generic parameter?
Thanks,
James Bucher
Reply | Threaded
Open this post in threaded view
|

Re: Scala case classes with a generic parameter

Aljoscha Krettek
Hi James,
the TypeInformation must be available at the call site, not in the case class definition. In your WindowFunction you are using a TestGen[String] so it should suffice to add this line at some point before the call to apply():

implicit val testGenType = createTypeInformation[TestGen[String]]

Hope that helps.

Best,
Aljoscha

On Wed, 1 Jun 2016 at 20:11 James Bucher <[hidden email]> wrote:
Hi,

I have been trying to get a case class with a generic parameter working with Filnk 1.0.3 and have been having some trouble. However when I compile I get the following error:
debug-type-bug/src/main/scala/com/example/flink/jobs/CaseClassWithGeneric.scala:40: error: could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[com.example.flink.jobs.CaseClassWithGeneric.TestGen[String]]
[ERROR]           .apply(new AggregateOrigins)

I am importing org.apache.flink.api.scala._ and the generic type is defined as [T: TypeInformation] as suggested here: https://ci.apache.org/projects/flink/flink-docs-master/internals/types_serialization.html 

The full code for the program is as follows:
package com.example.flink.jobs

import java.util.{Properties}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer08}
import org.apache.flink.api.scala._

object CaseClassWithGeneric {
case class TestGen[T: TypeInformation](item: T) {}

class AggregateOrigins extends WindowFunction[String, TestGen[String], String, TimeWindow] {
def apply(key: String, win: TimeWindow, values: Iterable[String], col: Collector[TestGen[String]]): Unit = {
values.foreach(x => { })
col.collect(new TestGen[String]("Foo"))
}
}

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties();
val messageStream = env.addSource(
new FlinkKafkaConsumer08("topic", new SimpleStringSchema, properties))
.keyBy(s => s)
.timeWindow(Time.days(1))
.apply(new AggregateOrigins)
messageStream.print()
env.execute("Simple Job")
}
}
When I dug into the apply() function definition I found the following:
def apply[R: TypeInformation](
function: WindowFunction[T, R, K, W]): DataStream[R] = {

val cleanFunction = clean(function)
val applyFunction = new ScalaWindowFunctionWrapper[T, R, K, W](cleanFunction)
asScalaStream(javaStream.apply(applyFunction, implicitly[TypeInformation[R]]))
}
As Far as I can tell TestGen[String] should correspond to [R: TypeInformation] in apply. Am I missing something or is it not possible to define case class with a generic parameter?
Thanks,
James Bucher