In a KeyedCoProcessFunction, I am managing a keyed state which consists of
third-party library models. These models are created on reception of new data on the control stream within `processElement1`. Because the models are self-evolving, in the sense that have their own internal state, I need to make sure that they are serialized in `modelsBytes` when their state changes. My first attempt goes like this: ```scala class MyOperator extends KeyedCoProcessFunction[String, Control, Data, Prediction] with CheckpointedFunction { // To hold loaded models @transient private var models: HashMap[String, Model] = _ // For serialization purposes @transient private var modelsBytes: MapState[String, Array[Bytes]] = _ override def processElement1(control, ctx, ...) { if (restoreModels) { restoreModels() } // - Create new model out of `control` element // - Add it to `models` keyed state } override def processElement2(data, ctx, ...) { if (restoreModels) { restoreModels() } // - Send `data` element to the corresponding models // This will update their internal states } override def snapshotState(context: FunctionSnapshotContext): Unit = { // Suspicious, wishful-thinking code that compiles and runs just fine for ((k, model) <- models) { modelsBytes.put(k, model.toBytes(v)) } } override def initializeState(context: FunctionInitializationContext): Unit = { modelsBytes = context.getKeyedStateStore.getMapState[String]( new MapStateDescriptor("modelsBytes", classOf[String]) ) if (context.isRestored) restoreModels = true } } ``` So, the idea is to use `snapshotState` to override the *keyed* state entries in `modelsBytes`. The reason why I am trying this approach is because serializing the models (`model.toBytes`) might be an expensive operation. Therefore, I would prefer to do it once per model when a checkpoint comes. The problem with this approach is that it might be inherently/conceptually wrong. Here is why, even if the code within `snapshotState` compiles and runs just fine, note that I am referring to a keyed state piece without getting a `keyed` context passed in, so it is not clear at all what key I am really working on to start with. I have written a small test to verify the checkpoints, and I have observed that from time to time I get an empty state back, even if the modelsBytes state entries were updated in `snapshotState`. So it seems that snapshotting my models like this is not reliable at all. What confuses me is that the user is perfectly allowed to do this, maybe the `put` method should raise an exception to make it clear that a keyed state is required in the first place, otherwise it gives false hope and might lead to hard-to-spot bugs. As a matter of fact, shouldn't this be considered a bug? The other option I have is, of course, to serialize my models in `processElement2`, after sending new data elements to them. However, continuously serialzing my models to update `modelsBytes` might be costly. What would be the most efficient way to handle this scenario? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
I guess another option not mentioned in my question could be to use a custom
serializer for the models. This way, I would not need to consider serialization issues myself within the process function and the snapshots for my models would be taken only once per checkpoint as desired -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by Salva Alcántara
Yet another option would be to use operator state instead, but this looks
trickier to me. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |