json mapper

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

json mapper

Peter Ertl
Hi flink users,

I just wanted to ask if this kind of scala map function is correct?

object JsonMapper {
private val mapper: ObjectMapper = new ObjectMapper()
}

class JsonMapper extends MapFunction[String, ObjectNode] {
override def map(value: String): ObjectNode = JsonMapper.mapper.readValue(value, classOf[ObjectNode])
}

Is using a static reference to ObjectMapper fine or will this cause issues on a distributed cluster / with checkpoint / serializing state / whatever ?

Or should I instead use a non-transient property initialized in ctor (ObjectMapper is java.io.Serializable) ?

Or should I initialize it with RichMapFunction.open into a transient property?

Also I am wondering if replacing 'class' with 'object' (=> singleton)

object JsonMapper extends MapFunction[String, ObjectNode] { /* ..*/ }

is ok (actually the mapper is stateless so no obvious need to re-instantiate it again and again ? )

Thanks and best regards
Peter
Reply | Threaded
Open this post in threaded view
|

Re: json mapper

Nico Kruber
Hi Peter,
I'm no Scala developer but I may be able to help with some concepts:

* a static reference used inside a [Map]Function will certainly cause problems
when executed in parallel in the same JVM, e.g. a TaskManager with multiple
slots, depending on whether this static object is stateful and/or thread-safe
* additionally, not all parallel instances of your map may be executed in the
same JVM, e.g. on multiple TaskManagers, so you cannot assume that the state
of the JsonMapper is consistent among them
* if the ObjectMapper does not store any state that is worth recovering during
a failure (none that I see from https://fasterxml.github.io/jackson-databind/
javadoc/2.3.0/com/fasterxml/jackson/databind/ObjectMapper.html if that is the
one you are using), then you don't need to put it into flink state but can
either initialise it as a (non-static) member of your MapFunction class or
even in your map function itself
* for the correct use of keyed/non-keyed state, please refer to my other email
or [1]
* for 'class' vs. 'object': if you're using
com.fasterxml.jackson.databind.ObjectMapper as described above, you'll have
state again ("It will use instances of JsonParser and JsonGenerator for
implementing actual reading/writing of JSON. " from the docs) but in general,
it is a good question whether the singleton would work for stateless operators
and whether it actually improves performance.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/
state.html

On Thursday, 3 August 2017 12:41:36 CEST Peter Ertl wrote:

> Hi flink users,
>
> I just wanted to ask if this kind of scala map function is correct?
>
> object JsonMapper {
>   private val mapper: ObjectMapper = new ObjectMapper()
> }
>
> class JsonMapper extends MapFunction[String, ObjectNode] {
>   override def map(value: String): ObjectNode =
> JsonMapper.mapper.readValue(value, classOf[ObjectNode]) }
>
> Is using a static reference to ObjectMapper fine or will this cause issues
> on a distributed cluster / with checkpoint / serializing state / whatever ?
>
> Or should I instead use a non-transient property initialized in ctor
> (ObjectMapper is java.io.Serializable) ?
>
> Or should I initialize it with RichMapFunction.open into a transient
> property?
>
> Also I am wondering if replacing 'class' with 'object' (=> singleton)
>
> object JsonMapper extends MapFunction[String, ObjectNode] { /* ..*/ }
>
> is ok (actually the mapper is stateless so no obvious need to re-instantiate
> it again and again ? )
>
> Thanks and best regards
> Peter


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: json mapper

Eron Wright
I think your snippet looks good.  The Jackson ObjectMapper is designed to be reused by numerous threads, and routinely stored as a static field.  It is somewhat expensive to create.

Hope this helps,
-Eron

On Thu, Aug 3, 2017 at 7:46 AM, Nico Kruber <[hidden email]> wrote:
Hi Peter,
I'm no Scala developer but I may be able to help with some concepts:

* a static reference used inside a [Map]Function will certainly cause problems
when executed in parallel in the same JVM, e.g. a TaskManager with multiple
slots, depending on whether this static object is stateful and/or thread-safe
* additionally, not all parallel instances of your map may be executed in the
same JVM, e.g. on multiple TaskManagers, so you cannot assume that the state
of the JsonMapper is consistent among them
* if the ObjectMapper does not store any state that is worth recovering during
a failure (none that I see from <a href="https://fasterxml.github.io/jackson-databind/ javadoc/2.3.0/com/fasterxml/jackson/databind/ObjectMapper.html" rel="noreferrer" target="_blank">https://fasterxml.github.io/jackson-databind/
javadoc/2.3.0/com/fasterxml/jackson/databind/ObjectMapper.html if that is the
one you are using), then you don't need to put it into flink state but can
either initialise it as a (non-static) member of your MapFunction class or
even in your map function itself
* for the correct use of keyed/non-keyed state, please refer to my other email
or [1]
* for 'class' vs. 'object': if you're using
com.fasterxml.jackson.databind.ObjectMapper as described above, you'll have
state again ("It will use instances of JsonParser and JsonGenerator for
implementing actual reading/writing of JSON. " from the docs) but in general,
it is a good question whether the singleton would work for stateless operators
and whether it actually improves performance.


Nico

[1] <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/ state.html" rel="noreferrer" target="_blank">https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/
state.html

On Thursday, 3 August 2017 12:41:36 CEST Peter Ertl wrote:
> Hi flink users,
>
> I just wanted to ask if this kind of scala map function is correct?
>
> object JsonMapper {
>   private val mapper: ObjectMapper = new ObjectMapper()
> }
>
> class JsonMapper extends MapFunction[String, ObjectNode] {
>   override def map(value: String): ObjectNode =
> JsonMapper.mapper.readValue(value, classOf[ObjectNode]) }
>
> Is using a static reference to ObjectMapper fine or will this cause issues
> on a distributed cluster / with checkpoint / serializing state / whatever ?
>
> Or should I instead use a non-transient property initialized in ctor
> (ObjectMapper is java.io.Serializable) ?
>
> Or should I initialize it with RichMapFunction.open into a transient
> property?
>
> Also I am wondering if replacing 'class' with 'object' (=> singleton)
>
> object JsonMapper extends MapFunction[String, ObjectNode] { /* ..*/ }
>
> is ok (actually the mapper is stateless so no obvious need to re-instantiate
> it again and again ? )
>
> Thanks and best regards
> Peter