Custom Kryo serializer

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Custom Kryo serializer

Boris Lublinsky
Hi 
I have several implementations of my Model trait, 

trait Model {
def score(input : AnyVal) : AnyVal
def cleanup() : Unit
def toBytes() : Array[Byte]
def getType : Long
}

neither one of them are serializable, but are used in the state definition.
So I implemented custom serializer

import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.lightbend.model.modeldescriptor.ModelDescriptor


class ModelSerializerKryo extends Serializer[Model]{

super.setAcceptsNull(false)
super.setImmutable(true)

/** Reads bytes and returns a new object of the specified concrete type.
* <p>
* Before Kryo can be used to read child objects, {@link Kryo#reference(Object)} must be called with the parent object to
* ensure it can be referenced by the child objects. Any serializer that uses {@link Kryo} to read a child object may need to
* be reentrant.
* <p>
* This method should not be called directly, instead this serializer can be passed to {@link Kryo} read methods that accept a
* serialier.
*
* @return May be null if { @link #getAcceptsNull()} is true. */

override def read(kryo: Kryo, input: Input, `type`: Class[Model]): Model = {

import ModelSerializerKryo._

val mType = input.readLong().asInstanceOf[Int]
val bytes = Stream.continually(input.readByte()).takeWhile(_ != -1).toArray
factories.get(mType) match {
case Some(factory) => factory.restore(bytes)
case _ => throw new Exception(s"Unknown model type $mType to restore")
}
}

/** Writes the bytes for the object to the output.
* <p>
* This method should not be called directly, instead this serializer can be passed to {@link Kryo} write methods that accept a
* serialier.
*
* @param value May be null if { @link #getAcceptsNull()} is true. */

override def write(kryo: Kryo, output: Output, value: Model): Unit = {
output.writeLong(value.getType)
output.write(value.toBytes)
}
}

object ModelSerializerKryo{
private val factories = Map(ModelDescriptor.ModelType.PMML.value -> PMMLModel,
ModelDescriptor.ModelType.TENSORFLOW.value -> TensorFlowModel)
}
And added the following 

// Add custom serializer
env.getConfig.addDefaultKryoSerializer(classOf[Model], classOf[ModelSerializerKryo])

To configure it.
I can see checkpoint messages at the output console, but I never hist a break point in serializer.
Any suggestions?
 



Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

Reply | Threaded
Open this post in threaded view
|

Re: Custom Kryo serializer

Chesnay Schepler
Hello,

I assume you're passing the class of your serializer in a StateDescriptor constructor.

If so, you could add a breakpoint in Statedescriptor#initializeSerializerUnlessSet,
and check what typeInfo is created and which serializer is created as a result.

One thing you could try right away is registering your serializer for the Model implementations,
instead of the trait.

Regards,
Chesnay

On 14.07.2017 15:50, Boris Lublinsky wrote:
Hi 
I have several implementations of my Model trait, 

trait Model {
  def score(input : AnyVal) : AnyVal
  def cleanup() : Unit
  def toBytes() : Array[Byte]
  def getType : Long
}

neither one of them are serializable, but are used in the state definition.
So I implemented custom serializer

import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.lightbend.model.modeldescriptor.ModelDescriptor


class ModelSerializerKryo extends Serializer[Model]{
  
  super.setAcceptsNull(false)
  super.setImmutable(true)

  /** Reads bytes and returns a new object of the specified concrete type.
    * <p>
    * Before Kryo can be used to read child objects, {@link Kryo#reference(Object)} must be called with the parent object to
    * ensure it can be referenced by the child objects. Any serializer that uses {@link Kryo} to read a child object may need to
    * be reentrant.
    * <p>
    * This method should not be called directly, instead this serializer can be passed to {@link Kryo} read methods that accept a
    * serialier.
    *
    * @return May be null if { @link #getAcceptsNull()} is true. */
  
  override def read(kryo: Kryo, input: Input, `type`: Class[Model]): Model = {

    import ModelSerializerKryo._

    val mType = input.readLong().asInstanceOf[Int]
    val bytes = Stream.continually(input.readByte()).takeWhile(_ != -1).toArray
    factories.get(mType) match {
      case Some(factory) => factory.restore(bytes)
      case _ => throw new Exception(s"Unknown model type $mType to restore")
    }
  }

  /** Writes the bytes for the object to the output.
    * <p>
    * This method should not be called directly, instead this serializer can be passed to {@link Kryo} write methods that accept a
    * serialier.
    *
    * @param value May be null if { @link #getAcceptsNull()} is true. */
  
  override def write(kryo: Kryo, output: Output, value: Model): Unit = {
    output.writeLong(value.getType)
    output.write(value.toBytes)
  }
}

object ModelSerializerKryo{
  private val factories = Map(ModelDescriptor.ModelType.PMML.value -> PMMLModel,
    ModelDescriptor.ModelType.TENSORFLOW.value -> TensorFlowModel)
}
And added the following 

// Add custom serializer
env.getConfig.addDefaultKryoSerializer(classOf[Model], classOf[ModelSerializerKryo])

To configure it.
I can see checkpoint messages at the output console, but I never hist a break point in serializer.
Any suggestions?
 



Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/


Reply | Threaded
Open this post in threaded view
|

Re: Custom Kryo serializer

Boris Lublinsky
Thanks for the reply, but I am not using it for managed state, but rather for the raw state
In my implementation I have the following

class DataProcessorKeyed extends CoProcessFunction[WineRecord, ModelToServe, Double]{

// The managed keyed state see https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html
var modelState: ValueState[ModelToServeStats] = _
var newModelState: ValueState[ModelToServeStats] = _
// The raw state - https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state
var currentModel : Option[Model] = None
var newModel : Option[Model] = None

Where current and new model are instances of the trait for which I implement serializer

Raw State is state that operators keep in their own data structures. When checkpointed, they only write a sequence of bytes into the checkpoint. Flink knows nothing about the state’s data structures and sees only the raw bytes.

So I was assuming that I need to provide serializer for this.
Am I missing something?





Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/


---------- Forwarded message ----------
From: Chesnay Schepler <[hidden email]>
Date: Wed, Jul 19, 2017 at 1:34 PM
Subject: Re: Custom Kryo serializer
To: [hidden email]


Hello,

I assume you're passing the class of your serializer in a StateDescriptor constructor.

If so, you could add a breakpoint in Statedescriptor#initializeSerializerUnlessSet,
and check what typeInfo is created and which serializer is created as a result.

One thing you could try right away is registering your serializer for the Model implementations,
instead of the trait.

Regards,
Chesnay


On 14.07.2017 15:50, Boris Lublinsky wrote:
Hi 
I have several implementations of my Model trait, 

trait Model {
  def score(input : AnyVal) : AnyVal
  def cleanup() : Unit
  def toBytes() : Array[Byte]
  def getType : Long
}

neither one of them are serializable, but are used in the state definition.
So I implemented custom serializer

import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.lightbend.model.modeldescriptor.ModelDescriptor


class ModelSerializerKryo extends Serializer[Model]{
  
  super.setAcceptsNull(false)
  super.setImmutable(true)

  /** Reads bytes and returns a new object of the specified concrete type.
    * <p>
    * Before Kryo can be used to read child objects, {@link Kryo#reference(Object)} must be called with the parent object to
    * ensure it can be referenced by the child objects. Any serializer that uses {@link Kryo} to read a child object may need to
    * be reentrant.
    * <p>
    * This method should not be called directly, instead this serializer can be passed to {@link Kryo} read methods that accept a
    * serialier.
    *
    * @return May be null if { @link #getAcceptsNull()} is true. */
  
  override def read(kryo: Kryo, input: Input, `type`: Class[Model]): Model = {

    import ModelSerializerKryo._

    val mType = input.readLong().asInstanceOf[Int]
    val bytes = Stream.continually(input.readByte()).takeWhile(_ != -1).toArray
    factories.get(mType) match {
      case Some(factory) => factory.restore(bytes)
      case _ => throw new Exception(s"Unknown model type $mType to restore")
    }
  }

  /** Writes the bytes for the object to the output.
    * <p>
    * This method should not be called directly, instead this serializer can be passed to {@link Kryo} write methods that accept a
    * serialier.
    *
    * @param value May be null if { @link #getAcceptsNull()} is true. */
  
  override def write(kryo: Kryo, output: Output, value: Model): Unit = {
    output.writeLong(value.getType)
    output.write(value.toBytes)
  }
}

object ModelSerializerKryo{
  private val factories = Map(ModelDescriptor.ModelType.PMML.value -> PMMLModel,
    ModelDescriptor.ModelType.TENSORFLOW.value -> TensorFlowModel)
}
And added the following 

// Add custom serializer
env.getConfig.addDefaultKryoSerializer(classOf[Model], classOf[ModelSerializerKryo])

To configure it.
I can see checkpoint messages at the output console, but I never hist a break point in serializer.
Any suggestions?
 



Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/




Reply | Threaded
Open this post in threaded view
|

Re: Custom Kryo serializer

Chesnay Schepler
Raw state can only be used when implementing an operator, not a function.

For functions you have to use Managed Operator State. Your function will have to implement
the CheckpointedFunction interface, and create a ValueStateDescriptor that you register in initializeState.

On 19.07.2017 15:28, Boris Lublinsky wrote:
Thanks for the reply, but I am not using it for managed state, but rather for the raw state
In my implementation I have the following

class DataProcessorKeyed extends CoProcessFunction[WineRecord, ModelToServe, Double]{

  // The managed keyed state see https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html
  var modelState: ValueState[ModelToServeStats] = _
  var newModelState: ValueState[ModelToServeStats] = _
  // The raw state - https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state
  var currentModel : Option[Model] = None
  var newModel : Option[Model] = None

Where current and new model are instances of the trait for which I implement serializer

Raw State is state that operators keep in their own data structures. When checkpointed, they only write a sequence of bytes into the checkpoint. Flink knows nothing about the state’s data structures and sees only the raw bytes.

So I was assuming that I need to provide serializer for this.
Am I missing something?





Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/


---------- Forwarded message ----------
From: Chesnay Schepler <[hidden email]>
Date: Wed, Jul 19, 2017 at 1:34 PM
Subject: Re: Custom Kryo serializer
To: [hidden email]


Hello,

I assume you're passing the class of your serializer in a StateDescriptor constructor.

If so, you could add a breakpoint in Statedescriptor#initializeSerializerUnlessSet,
and check what typeInfo is created and which serializer is created as a result.

One thing you could try right away is registering your serializer for the Model implementations,
instead of the trait.

Regards,
Chesnay


On 14.07.2017 15:50, Boris Lublinsky wrote:
Hi 
I have several implementations of my Model trait, 

trait Model {
  def score(input : AnyVal) : AnyVal
  def cleanup() : Unit
  def toBytes() : Array[Byte]
  def getType : Long
}

neither one of them are serializable, but are used in the state definition.
So I implemented custom serializer

import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.lightbend.model.modeldescriptor.ModelDescriptor


class ModelSerializerKryo extends Serializer[Model]{
  
  super.setAcceptsNull(false)
  super.setImmutable(true)

  /** Reads bytes and returns a new object of the specified concrete type.
    * <p>
    * Before Kryo can be used to read child objects, {@link Kryo#reference(Object)} must be called with the parent object to
    * ensure it can be referenced by the child objects. Any serializer that uses {@link Kryo} to read a child object may need to
    * be reentrant.
    * <p>
    * This method should not be called directly, instead this serializer can be passed to {@link Kryo} read methods that accept a
    * serialier.
    *
    * @return May be null if { @link #getAcceptsNull()} is true. */
  
  override def read(kryo: Kryo, input: Input, `type`: Class[Model]): Model = {

    import ModelSerializerKryo._

    val mType = input.readLong().asInstanceOf[Int]
    val bytes = Stream.continually(input.readByte()).takeWhile(_ != -1).toArray
    factories.get(mType) match {
      case Some(factory) => factory.restore(bytes)
      case _ => throw new Exception(s"Unknown model type $mType to restore")
    }
  }

  /** Writes the bytes for the object to the output.
    * <p>
    * This method should not be called directly, instead this serializer can be passed to {@link Kryo} write methods that accept a
    * serialier.
    *
    * @param value May be null if { @link #getAcceptsNull()} is true. */
  
  override def write(kryo: Kryo, output: Output, value: Model): Unit = {
    output.writeLong(value.getType)
    output.write(value.toBytes)
  }
}

object ModelSerializerKryo{
  private val factories = Map(ModelDescriptor.ModelType.PMML.value -> PMMLModel,
    ModelDescriptor.ModelType.TENSORFLOW.value -> TensorFlowModel)
}
And added the following 

// Add custom serializer
env.getConfig.addDefaultKryoSerializer(classOf[Model], classOf[ModelSerializerKryo])

To configure it.
I can see checkpoint messages at the output console, but I never hist a break point in serializer.
Any suggestions?
 



Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/