Hi flink users,
I just wanted to ask if this kind of scala map function is correct? object JsonMapper {
|
Hi Peter,
I'm no Scala developer but I may be able to help with some concepts: * a static reference used inside a [Map]Function will certainly cause problems when executed in parallel in the same JVM, e.g. a TaskManager with multiple slots, depending on whether this static object is stateful and/or thread-safe * additionally, not all parallel instances of your map may be executed in the same JVM, e.g. on multiple TaskManagers, so you cannot assume that the state of the JsonMapper is consistent among them * if the ObjectMapper does not store any state that is worth recovering during a failure (none that I see from https://fasterxml.github.io/jackson-databind/ javadoc/2.3.0/com/fasterxml/jackson/databind/ObjectMapper.html if that is the one you are using), then you don't need to put it into flink state but can either initialise it as a (non-static) member of your MapFunction class or even in your map function itself * for the correct use of keyed/non-keyed state, please refer to my other email or [1] * for 'class' vs. 'object': if you're using com.fasterxml.jackson.databind.ObjectMapper as described above, you'll have state again ("It will use instances of JsonParser and JsonGenerator for implementing actual reading/writing of JSON. " from the docs) but in general, it is a good question whether the singleton would work for stateless operators and whether it actually improves performance. Nico [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/ state.html On Thursday, 3 August 2017 12:41:36 CEST Peter Ertl wrote: > Hi flink users, > > I just wanted to ask if this kind of scala map function is correct? > > object JsonMapper { > private val mapper: ObjectMapper = new ObjectMapper() > } > > class JsonMapper extends MapFunction[String, ObjectNode] { > override def map(value: String): ObjectNode = > JsonMapper.mapper.readValue(value, classOf[ObjectNode]) } > > Is using a static reference to ObjectMapper fine or will this cause issues > on a distributed cluster / with checkpoint / serializing state / whatever ? > > Or should I instead use a non-transient property initialized in ctor > (ObjectMapper is java.io.Serializable) ? > > Or should I initialize it with RichMapFunction.open into a transient > property? > > Also I am wondering if replacing 'class' with 'object' (=> singleton) > > object JsonMapper extends MapFunction[String, ObjectNode] { /* ..*/ } > > is ok (actually the mapper is stateless so no obvious need to re-instantiate > it again and again ? ) > > Thanks and best regards > Peter signature.asc (201 bytes) Download Attachment |
I think your snippet looks good. The Jackson ObjectMapper is designed to be reused by numerous threads, and routinely stored as a static field. It is somewhat expensive to create. Hope this helps, -Eron On Thu, Aug 3, 2017 at 7:46 AM, Nico Kruber <[hidden email]> wrote: Hi Peter, |
Free forum by Nabble | Edit this page |