MalformedClassName for scala case class
Posted by
Georg Heiler on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/MalformedClassName-for-scala-case-class-tp36521.html
Hi,
why can't I register the stream as a table and get a MalformedClassName exception?
val serializer = new JSONKeyValueDeserializationSchema(false)
val stream = senv.addSource(
new FlinkKafkaConsumer(
"tweets-raw-json",
serializer,
properties
).setStartFromEarliest() // TODO experiment with different start values
)
case class Foo(lang: String, count: Int)
val r = stream
.map(e => {
Foo(e.get("value").get("lang").asText(), 1)
})
.keyBy(_.lang)
.timeWindow(Time.seconds(10))
.sum("count")
r.print()
stenv.registerDataStream("tweets_json", r)
Best,
Georg