MalformedClassName for scala case class

Posted by Georg Heiler on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/MalformedClassName-for-scala-case-class-tp36521.html

Hi,

why can't I register the stream as a table and get a MalformedClassName exception?

val serializer = new JSONKeyValueDeserializationSchema(false)
val stream = senv.addSource(
    new FlinkKafkaConsumer(
      "tweets-raw-json",
      serializer,
      properties
    ).setStartFromEarliest() // TODO experiment with different start values
  )

case class Foo(lang: String, count: Int)
val r = stream
    .map(e => {
      Foo(e.get("value").get("lang").asText(), 1)
    })
    .keyBy(_.lang)
    .timeWindow(Time.seconds(10))
    .sum("count")
r.print()
stenv.registerDataStream("tweets_json", r)

Best,
Georg