Parsing source JSON String as Scala Case Class

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Parsing source JSON String as Scala Case Class

Jack Huang-2
Hi all,

I want to read a source of JSON String as Scala Case Class. I don't want to have to write a serde for every case class I have. The idea is:
val events = env.addSource(new FlinkKafkaConsumer09[Event]("events", new JsonSerde(classOf[Event]), kafkaProp))

I was implementing my own JsonSerde with Jackson/Gson, but in both case I get the error
Task not serializable
    org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
    org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
    org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:568)
    org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:498)
    com.fractionalmedia.stream.PricerEvent$.main(PricerEvent.scala:100)
It seems that both Jackson and Gson have classes that is not serializable.

I couldn't find any other solution to perform this JSON-to-Case-Class parsing, yet it seems a very basic need. What am I missing?


Thanks,
Jack



Reply | Threaded
Open this post in threaded view
|

Re: Parsing source JSON String as Scala Case Class

Stephan Ewen
If the class has non-serializable members, you need to initialize them "lazily" when the objects are already in the distributed execution (after serializing / distributing them).

Making a Scala 'val' a 'lazy val' often does the trick (at minimal performance cost).

On Thu, Aug 4, 2016 at 3:56 AM, Jack Huang <[hidden email]> wrote:
Hi all,

I want to read a source of JSON String as Scala Case Class. I don't want to have to write a serde for every case class I have. The idea is:
val events = env.addSource(new FlinkKafkaConsumer09[Event]("events", new JsonSerde(classOf[Event]), kafkaProp))

I was implementing my own JsonSerde with Jackson/Gson, but in both case I get the error
Task not serializable
    org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
    org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
    org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:568)
    org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:498)
    com.fractionalmedia.stream.PricerEvent$.main(PricerEvent.scala:100)
It seems that both Jackson and Gson have classes that is not serializable.

I couldn't find any other solution to perform this JSON-to-Case-Class parsing, yet it seems a very basic need. What am I missing?


Thanks,
Jack




Reply | Threaded
Open this post in threaded view
|

Re: Parsing source JSON String as Scala Case Class

Jack Huang-2
Thanks Stephan. "lazy val" does the trick.

On Thu, Aug 4, 2016 at 2:33 AM, Stephan Ewen <[hidden email]> wrote:
If the class has non-serializable members, you need to initialize them "lazily" when the objects are already in the distributed execution (after serializing / distributing them).

Making a Scala 'val' a 'lazy val' often does the trick (at minimal performance cost).

On Thu, Aug 4, 2016 at 3:56 AM, Jack Huang <[hidden email]> wrote:
Hi all,

I want to read a source of JSON String as Scala Case Class. I don't want to have to write a serde for every case class I have. The idea is:
val events = env.addSource(new FlinkKafkaConsumer09[Event]("events", new JsonSerde(classOf[Event]), kafkaProp))

I was implementing my own JsonSerde with Jackson/Gson, but in both case I get the error
Task not serializable
    org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
    org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
    org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:568)
    org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:498)
    com.fractionalmedia.stream.PricerEvent$.main(PricerEvent.scala:100)
It seems that both Jackson and Gson have classes that is not serializable.

I couldn't find any other solution to perform this JSON-to-Case-Class parsing, yet it seems a very basic need. What am I missing?


Thanks,
Jack