Given:
```scala class MyOperator extends KeyedCoProcessFunction[String, ModelDef, Data, Prediction] with CheckpointedFunction { // To hold loaded models @transient private var models: HashMap[(String, String), Model] = _ // For serialization purposes @transient private var modelsBytes: MapState[(String, String), Array[Bytes]] = _ ... override def snapshotState(context: FunctionSnapshotContext): Unit = { modelsBytes.clear() // This raises an exception when there is no active key set for ((k, model) <- models) { modelsBytes.put(k, model.toBytes(v)) } } override def initializeState(context: FunctionInitializationContext): Unit = { modelsBytes = context.getKeyedStateStore.getMapState[String, String]( new MapStateDescriptor("modelsBytes", classOf[String], classOf[String]) ) if (context.isRestored) { // restore models from modelsBytes } } } ``` It happens that `modelsBytes.clear()` raises an exception when there is no active key. This happens when I start the application from scratch without any data on the input streams. So, when the time for a checkpoint comes, I get this error: `java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context.` However, when the input stream contains data, checkpoints work just fine. I am a bit confused about this because `snapshotState` does not provide a keyed context (contrary to `processElement1` and `processElement2`, where the current key is accessible by doing `ctx.getCurrentKey`) so it seems to me that the calls to `clear` and `put` within `snapshotState` should fail always since they're supposed to work only within a keyed context. Can anyone clarify if this is the expected behaviour actually? |
Hi From the exception `No key set. This method should not be called outside of a keyed context.` it means that the key current passed in is null. In my opinion, it's something wrong here if there will throw an exception when no data arrive. could you please share the whole stack and a minimal reproducible job for this issue? Best, Congxian Salva Alcántara <[hidden email]> 于2019年12月1日周日 下午3:01写道:
|
Hi Salva The root cause is that you did not figure out the difference between keyed state and operator state.
There is no ‘currentKey’ in operator state, which means PartitionableListState#clear() will clear the whole state. However, there is always a ‘currentKey’ in keyed state, which means ‘state#clear()’ would only remove the entry scoped to
current runtime key. In your example code, the state to clear is a MapState (not a list state) and therefore must be a keyed state. If your job did not process any record, there would no ‘currentKey’ to be set [1] for that ‘modelsBytes’ state which lead to
the NPE when calling ‘state#clear()’. Moreover, ‘snapshotState’ and ‘initializeState’ interfaces are used mainly to snapshot and initialize for operator state. Last but not least, even you could ensure at least one record processed before calling ‘snapshotState’, it’s not clear for your program logic. You cannot control well which entry in you state would be cleared since you cannot control the
current key which set via processing record. You could refer to TwoPhaseCommitSinkFunction [2] to figure out what state could be cleared during snapshotStaet. Best Yun Tang From: Congxian Qiu <[hidden email]> Hi From the exception `No key set. This method should not be called outside of a keyed context.` it means that the key current passed in is null. In my opinion, it's something wrong here if there will throw an exception when no data arrive.
could you please share the whole stack and a minimal reproducible job for this issue?
Best, Congxian Salva Alcántara <[hidden email]>
于2019年12月1日周日
下午3:01写道:
|
In reply to this post by Congxian Qiu
Thanks Congxian. From what I've read, it seems that using the keyed state in
`snapshotState` is incorrect...what confuses me is that if I do something like this ``` override def snapshotState(context: FunctionSnapshotContext): Unit = { if (models.nonEmpty) { modelsBytes.clear() // This raises an exception when there is no active key set for ((k, model) <- models) { modelsBytes.put(k, model.toBytes(v)) } } } ``` Then, when there is data (`models` is populated within `processElement1`), the `clear` and subsequent calls to `put` work just fine. This seems like a bug to me, as others have pointed out in this somehow extended question posted in stackoverflow: https://stackoverflow.com/questions/59123188/state-handling-on-keyedcoprocessfunction-serving-ml-models Do you think the fact that `clear` works within `snapshotState` under certain circumstances is indeed a bug? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by Yun Tang
Hi Yun,
Thanks for your reply. You mention that " ‘snapshotState’ and ‘initializeState’ interfaces are used mainly to snapshot and initialize for operator state" but..."mainly" is not "exclusively" right? So, I guess my question tries to figure out whether doing something like this is valid/makes sense? ``` override def snapshotState(context: FunctionSnapshotContext): Unit = { if (models.nonEmpty) { modelsBytes.clear() for ((k, model) <- models) { modelsBytes.put(k, model.toBytes(v)) } } } ``` Indeed, the above code seems to work well ... so it seems like a bug that `clear` works sometimes but sometimes not as I noted in my reply to Congxian and others have noted in this extended question posted in stackoverflow: https://stackoverflow.com/questions/59123188/state-handling-on-keyedcoprocessfunction-serving-ml-models -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Salva
As I pointed out, it's not clear for your program logic if you call 'state.clear()' within 'snapshotState' as you do not know what exact current key is. Hence, I think your idea like that does not make any sense. From my point of view, 'clear' works sometimes in your code is not a bug at current Flink framework. Currently, we would set currentKey when processing a record. However, Flink does not need to reset current key to null since there is no such life cycle for setting current key now. There seems no any benefit if introducing this, and might cause performance regression as we need more steps here. Best Yun Tang On 12/2/19, 9:29 PM, "Salva Alcántara" <[hidden email]> wrote: Hi Yun, Thanks for your reply. You mention that " ‘snapshotState’ and ‘initializeState’ interfaces are used mainly to snapshot and initialize for operator state" but..."mainly" is not "exclusively" right? So, I guess my question tries to figure out whether doing something like this is valid/makes sense? ``` override def snapshotState(context: FunctionSnapshotContext): Unit = { if (models.nonEmpty) { modelsBytes.clear() for ((k, model) <- models) { modelsBytes.put(k, model.toBytes(v)) } } } ``` Indeed, the above code seems to work well ... so it seems like a bug that `clear` works sometimes but sometimes not as I noted in my reply to Congxian and others have noted in this extended question posted in stackoverflow: https://stackoverflow.com/questions/59123188/state-handling-on-keyedcoprocessfunction-serving-ml-models -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Thanks a lot Yun!
-- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by Yun Tang
Hi Yun,
In the end, I left the code like this ``` override def snapshotState(context: FunctionSnapshotContext): Unit = { for ((k, model) <- models) { modelsBytes.put(k, model.toBytes(v)) } } ``` I have verified with a simple test that most of the times checkpoints seem to work fine. However, from time to time, the map state is not saved properly (getting an empty map state). So, looks like updating the keyed state like that within the `snapshotState` method is conceptually wrong, indeed this method does not receive any keyed context to start with. Because of this, I think the user should not even be allowed to invoke `put` (`nor` clear) on the map state object. That would help making things less confusing. The reason why I am trying to serialize my (keyed state) models inside `snaphsotState` is because these models are self-evolving and possess their own (time-varying) state, otherwise I could just serialize them once after creation on `processElement1` method. So, given this situation, how could I handle my use case? Ideally, I should only serialize them when checkpoints are taken, in particular I want to avoid having to serialize them after every element received in `processElement2` (the state of my models change with each new element processed here). Maybe I cannot achieve my goals with keyed state and need operator state instead. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
HI Salva
Sorry for missing your recent reply.
If you just want to make the models could be recoverable, you should choose operator state to store the "models". If you stick to the keyed state, I cannot see why these models are related to current processing key. As you can see, the "models" is just a HashMap[(String,
String), Model], and I don't know why we need to couple all models to just one specific key.
Best
Yun Tang
From: Salva Alcántara <[hidden email]>
Sent: Sunday, April 5, 2020 20:22 To: [hidden email] <[hidden email]> Subject: Re: Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction, valid or not? Hi Yun,
In the end, I left the code like this ``` override def snapshotState(context: FunctionSnapshotContext): Unit = { for ((k, model) <- models) { modelsBytes.put(k, model.toBytes(v)) } } ``` I have verified with a simple test that most of the times checkpoints seem to work fine. However, from time to time, the map state is not saved properly (getting an empty map state). So, looks like updating the keyed state like that within the `snapshotState` method is conceptually wrong, indeed this method does not receive any keyed context to start with. Because of this, I think the user should not even be allowed to invoke `put` (`nor` clear) on the map state object. That would help making things less confusing. The reason why I am trying to serialize my (keyed state) models inside `snaphsotState` is because these models are self-evolving and possess their own (time-varying) state, otherwise I could just serialize them once after creation on `processElement1` method. So, given this situation, how could I handle my use case? Ideally, I should only serialize them when checkpoints are taken, in particular I want to avoid having to serialize them after every element received in `processElement2` (the state of my models change with each new element processed here). Maybe I cannot achieve my goals with keyed state and need operator state instead. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Yun, thanks for your reply. I agree with you, I will switch to operator
state. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |