|
The docs provide a somewhat good
overview on how to interact with managed state: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state.html#using-managed-operator-state
To use the custom serializer you can supply the type class in the
StateDescriptor constructor when initializing the state:
private ListState<Model> currentModel = null;
...
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Model> descriptor = new ListStateDescriptor<>(
"current-model",
Model.class
);
currentModel = context.getOperatorStateStore().getListState(descriptor);
}
...
Note that the above is for unkeyed state. For keyed state
you would call context.getKeyedStateStore() instead.
Do not be confused that I'm using a ListState; that's just how the
interface works...the idea being that if you were to decrease the
parallelism upon restore a single instance of your function may
receive multiple states.
Anyway, you can add a Model to the state using ListState#add and
clear the entire ListState with ListState#clear.
I'm not sure though how well you can introduce options here.
Hope this helps,
Chesnay
On 24.07.2017 16:31, Boris Lublinsky wrote:
Thanks Chesney,
Can you, please, point me to any example?
Copy of a mail i sent to the
user mailing list only:
Raw state can only be used when implementing an
operator, not a function.
For functions you have to use Managed Operator State.
Your function will have to implement
the CheckpointedFunction interface, and create a
ValueStateDescriptor that you register in
initializeState.
On 24.07.2017 16:26, Boris Lublinsky wrote:
Is there a chance, this can be answered?
Begin forwarded message:
Subject: Re: Custom Kryo serializer
Date: July
19, 2017 at 8:28:16 AM CDT
Thanks for the reply, but I am not
using it for managed state, but rather for the
raw state
In my implementation I have the
following
Where current and new model are
instances of the trait for which I implement
serializer
“Raw State is state that operators
keep in their own data structures. When
checkpointed, they only write a sequence of
bytes into the checkpoint. Flink knows
nothing about the state’s data structures
and sees only the raw bytes.”
So I was assuming that I need to
provide serializer for this.
Am I missing something?
----------
Forwarded message ----------
From: Chesnay
Schepler <[hidden email]>
Date: Wed, Jul 19, 2017 at 1:34 PM
Subject: Re: Custom Kryo serializer
To: [hidden email]
Hello,
I assume you're passing the class
of your serializer in a
StateDescriptor constructor.
If so, you could add a breakpoint
in Statedescriptor# initializeSerializerUnlessSet,
and check what typeInfo is created
and which serializer is created as
a result.
One thing you could try right away
is registering your serializer for
the Model implementations,
instead of the trait.
Regards,
Chesnay
On 14.07.2017 15:50, Boris
Lublinsky wrote:
Hi
I have several
implementations of my Model
trait,
trait Model {
def score(input : AnyVal) : AnyVal
def cleanup() : Unit
def toBytes() : Array[Byte]
def getType : Long
}
neither one of
them are serializable, but
are used in the state
definition.
So I implemented
custom serializer
import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.lightbend.model.modeldescriptor.ModelDescriptor
class ModelSerializerKryo extends Serializer[Model]{
super.setAcceptsNull(false)
super.setImmutable(true)
/** Reads bytes and returns a new object of the specified concrete type.
* <p>
* Before Kryo can be used to read child objects, {@link Kryo#reference(Object)} must be called with the parent object to
* ensure it can be referenced by the child objects. Any serializer that uses {@link Kryo} to read a child object may need to
* be reentrant.
* <p>
* This method should not be called directly, instead this serializer can be passed to {@link Kryo} read methods that accept a
* serialier.
*
* @return May be null if { @link #getAcceptsNull()} is true. */
override def read(kryo: Kryo, input: Input, `type`: Class[Model]): Model = {
import ModelSerializerKryo._
val mType = input.readLong().asInstanceOf[Int]
val bytes = Stream.continually(input.readByte()).takeWhile(_ != -1).toArray
factories.get(mType) match {
case Some(factory) => factory.restore(bytes)
case _ => throw new Exception(s"Unknown model type $mType to restore")
}
}
/** Writes the bytes for the object to the output.
* <p>
* This method should not be called directly, instead this serializer can be passed to {@link Kryo} write methods that accept a
* serialier.
*
* @param value May be null if { @link #getAcceptsNull()} is true. */
override def write(kryo: Kryo, output: Output, value: Model): Unit = {
output.writeLong(value.getType)
output.write(value.toBytes)
}
}
object ModelSerializerKryo{
private val factories = Map(ModelDescriptor.ModelType.PMML.value -> PMMLModel,
ModelDescriptor.ModelType.TENSORFLOW.value -> TensorFlowModel)
}
And added the
following
// Add custom serializer
env.getConfig.addDefaultKryoSerializer(classOf[Model], classOf[ModelSerializerKryo])
To configure it.
I can see
checkpoint messages at the
output console, but I never
hist a break point in
serializer.
Any suggestions?
Begin forwarded message:
Subject: Custom Kryo serializer
Date: July
14, 2017 at 8:50:22 AM CDT
Hi
I have several implementations of
my Model trait,
trait Model {
def score(input : AnyVal) : AnyVal
def cleanup() : Unit
def toBytes() : Array[Byte]
def getType : Long
}
neither one of them are
serializable, but are used in the state
definition.
So I implemented custom serializer
import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.lightbend.model.modeldescriptor.ModelDescriptor
class ModelSerializerKryo extends Serializer[Model]{
super.setAcceptsNull(false)
super.setImmutable(true)
/** Reads bytes and returns a new object of the specified concrete type.
* <p>
* Before Kryo can be used to read child objects, {@link Kryo#reference(Object)} must be called with the parent object to
* ensure it can be referenced by the child objects. Any serializer that uses {@link Kryo} to read a child object may need to
* be reentrant.
* <p>
* This method should not be called directly, instead this serializer can be passed to {@link Kryo} read methods that accept a
* serialier.
*
* @return May be null if { @link #getAcceptsNull()} is true. */
override def read(kryo: Kryo, input: Input, `type`: Class[Model]): Model = {
import ModelSerializerKryo._
val mType = input.readLong().asInstanceOf[Int]
val bytes = Stream.continually(input.readByte()).takeWhile(_ != -1).toArray
factories.get(mType) match {
case Some(factory) => factory.restore(bytes)
case _ => throw new Exception(s"Unknown model type $mType to restore")
}
}
/** Writes the bytes for the object to the output.
* <p>
* This method should not be called directly, instead this serializer can be passed to {@link Kryo} write methods that accept a
* serialier.
*
* @param value May be null if { @link #getAcceptsNull()} is true. */
override def write(kryo: Kryo, output: Output, value: Model): Unit = {
output.writeLong(value.getType)
output.write(value.toBytes)
}
}
object ModelSerializerKryo{
private val factories = Map(ModelDescriptor.ModelType.PMML.value -> PMMLModel,
ModelDescriptor.ModelType.TENSORFLOW.value -> TensorFlowModel)
}
And added the following
// Add custom serializer
env.getConfig.addDefaultKryoSerializer(classOf[Model], classOf[ModelSerializerKryo])
To configure it.
I can see checkpoint messages at
the output console, but I never hist a break
point in serializer.
Any suggestions?
|