Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction, valid or not?

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction, valid or not?

Salva Alcántara
Given:


```scala
class MyOperator extends KeyedCoProcessFunction[String, ModelDef, Data, Prediction]
  with CheckpointedFunction {

  // To hold loaded models
  @transient private var models: HashMap[(String, String), Model] = _

  // For serialization purposes
  @transient private var modelsBytes: MapState[(String, String), Array[Bytes]] = _

  ...

  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    modelsBytes.clear() // This raises an exception when there is no active key set
    for ((k, model) <- models) {
      modelsBytes.put(k, model.toBytes(v))
    }
  }

  override def initializeState(context: FunctionInitializationContext): Unit = {
    modelsBytes = context.getKeyedStateStore.getMapState[String, String](
      new MapStateDescriptor("modelsBytes", classOf[String], classOf[String])
    )

    if (context.isRestored) {
      // restore models from modelsBytes
    }
  }

}
```

It happens that `modelsBytes.clear()` raises an exception when there is no active key. This happens when I start the application from scratch without any data on the input streams. So, when the time for a checkpoint comes, I get this error:

`java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context.`

However, when the input stream contains data, checkpoints work just fine. I am a bit confused about this because `snapshotState` does not provide a keyed context (contrary to `processElement1` and `processElement2`, where the current key is accessible by doing `ctx.getCurrentKey`) so it seems to me that the calls to `clear` and `put` within `snapshotState` should fail always since they're supposed to work only within a keyed context. Can anyone clarify if this is the expected behaviour actually?

Reply | Threaded
Open this post in threaded view
|

Re: Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction, valid or not?

Congxian Qiu
Hi

From the exception `No key set. This method should not be called outside of a keyed context.` it means that the key current passed in is null. In my opinion, it's something wrong here if there will throw an exception when no data arrive. could you please share the whole stack and a minimal reproducible job for this issue?

Best,
Congxian


Salva Alcántara <[hidden email]> 于2019年12月1日周日 下午3:01写道:
Given:


```scala
class MyOperator extends KeyedCoProcessFunction[String, ModelDef, Data, Prediction]
  with CheckpointedFunction {

  // To hold loaded models
  @transient private var models: HashMap[(String, String), Model] = _

  // For serialization purposes
  @transient private var modelsBytes: MapState[(String, String), Array[Bytes]] = _

  ...

  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    modelsBytes.clear() // This raises an exception when there is no active key set
    for ((k, model) <- models) {
      modelsBytes.put(k, model.toBytes(v))
    }
  }

  override def initializeState(context: FunctionInitializationContext): Unit = {
    modelsBytes = context.getKeyedStateStore.getMapState[String, String](
      new MapStateDescriptor("modelsBytes", classOf[String], classOf[String])
    )

    if (context.isRestored) {
      // restore models from modelsBytes
    }
  }

}
```

It happens that `modelsBytes.clear()` raises an exception when there is no active key. This happens when I start the application from scratch without any data on the input streams. So, when the time for a checkpoint comes, I get this error:

`java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context.`

However, when the input stream contains data, checkpoints work just fine. I am a bit confused about this because `snapshotState` does not provide a keyed context (contrary to `processElement1` and `processElement2`, where the current key is accessible by doing `ctx.getCurrentKey`) so it seems to me that the calls to `clear` and `put` within `snapshotState` should fail always since they're supposed to work only within a keyed context. Can anyone clarify if this is the expected behaviour actually?

Reply | Threaded
Open this post in threaded view
|

Re: Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction, valid or not?

Yun Tang

Hi Salva

 

The root cause is that you did not figure out the difference between keyed state and operator state.

 

There is no ‘currentKey’ in operator state, which means PartitionableListState#clear() will clear the whole state. However, there is always a ‘currentKey’ in keyed state, which means ‘state#clear()’ would only remove the entry scoped to current runtime key. In your example code, the state to clear is a MapState (not a list state) and therefore must be a keyed state. If your job did not process any record, there would no ‘currentKey’ to be set [1] for that ‘modelsBytes’ state which lead to the NPE when calling ‘state#clear()’.

 

Moreover, ‘snapshotState’ and ‘initializeState’ interfaces are used mainly to snapshot and initialize for operator state.

 

Last but not least, even you could ensure at least one record processed before calling ‘snapshotState’, it’s not clear for your program logic. You cannot control well which entry in you state would be cleared since you cannot control the current key which set via processing record.

 

You could refer to TwoPhaseCommitSinkFunction [2] to figure out what state could be cleared during snapshotStaet.

 

[1] https://github.com/apache/flink/blob/8c6cc4505a4c27daadb00cd94df8a7e955eb8d52/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java#L136

[2] https://github.com/apache/flink/blob/8c6cc4505a4c27daadb00cd94df8a7e955eb8d52/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java#L324

 

Best

Yun Tang

 

 

From: Congxian Qiu <[hidden email]>
Date: Monday, December 2, 2019 at 10:41 AM
To: Salva Alcántara <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction, valid or not?

 

Hi

 

From the exception `No key set. This method should not be called outside of a keyed context.` it means that the key current passed in is null. In my opinion, it's something wrong here if there will throw an exception when no data arrive. could you please share the whole stack and a minimal reproducible job for this issue?


Best,

Congxian

 

 

Salva Alcántara <[hidden email]> 2019121日周日 下午3:01写道:

Given:


```scala
class MyOperator extends KeyedCoProcessFunction[String, ModelDef, Data, Prediction]
  with CheckpointedFunction {

  // To hold loaded models
  @transient private var models: HashMap[(String, String), Model] = _

  // For serialization purposes
  @transient private var modelsBytes: MapState[(String, String), Array[Bytes]] = _

  ...

  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    modelsBytes.clear() // This raises an exception when there is no active key set
    for ((k, model) <- models) {
      modelsBytes.put(k, model.toBytes(v))
    }
  }

  override def initializeState(context: FunctionInitializationContext): Unit = {
    modelsBytes = context.getKeyedStateStore.getMapState[String, String](
      new MapStateDescriptor("modelsBytes", classOf[String], classOf[String])
    )

    if (context.isRestored) {
      // restore models from modelsBytes
    }
  }

}
```

It happens that `modelsBytes.clear()` raises an exception when there is no active key. This happens when I start the application from scratch without any data on the input streams. So, when the time for a checkpoint comes, I get this error:

`java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context.`

However, when the input stream contains data, checkpoints work just fine. I am a bit confused about this because `snapshotState` does not provide a keyed context (contrary to `processElement1` and `processElement2`, where the current key is accessible by doing `ctx.getCurrentKey`) so it seems to me that the calls to `clear` and `put` within `snapshotState` should fail always since they're supposed to work only within a keyed context. Can anyone clarify if this is the expected behaviour actually?

Reply | Threaded
Open this post in threaded view
|

Re: Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction, valid or not?

Salva Alcántara
In reply to this post by Congxian Qiu
Thanks Congxian. From what I've read, it seems that using the keyed state in
`snapshotState` is incorrect...what confuses me is that if I do something
like this

```
  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    if (models.nonEmpty) {
      modelsBytes.clear() // This raises an exception when there is no
active key set
      for ((k, model) <- models) {
        modelsBytes.put(k, model.toBytes(v))
      }
    }
  }
```

Then, when there is data (`models` is populated within `processElement1`),
the `clear` and subsequent calls to `put` work just fine. This seems like a
bug to me, as others have pointed out in this somehow extended question
posted in stackoverflow:

https://stackoverflow.com/questions/59123188/state-handling-on-keyedcoprocessfunction-serving-ml-models

Do you think the fact that `clear` works within `snapshotState` under
certain circumstances is indeed a bug?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction, valid or not?

Salva Alcántara
In reply to this post by Yun Tang
Hi Yun,

Thanks for your reply. You mention that

" ‘snapshotState’ and ‘initializeState’ interfaces are used mainly to
snapshot and initialize for operator state"

but..."mainly" is not "exclusively" right? So, I guess my question tries to
figure out whether doing something like this is valid/makes sense?

```
  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    if (models.nonEmpty) {
      modelsBytes.clear()
      for ((k, model) <- models) {
        modelsBytes.put(k, model.toBytes(v))
      }
    }
  }
```

Indeed, the above code seems to work well ... so it seems like a bug that
`clear` works sometimes but sometimes not as I noted in my reply to Congxian
and others have noted in this extended question posted in stackoverflow:

https://stackoverflow.com/questions/59123188/state-handling-on-keyedcoprocessfunction-serving-ml-models



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction, valid or not?

Yun Tang
Hi Salva

As I pointed out, it's not clear for your program logic if you call 'state.clear()' within 'snapshotState' as you do not know what exact current key is. Hence, I think your idea like that does not make any sense.

From my point of view, 'clear' works sometimes in your code is not a bug at current Flink framework. Currently, we would set currentKey when processing a record. However, Flink does not need to reset current key to null since there is no such life cycle for setting current key now. There seems no any benefit if introducing this, and might cause performance regression as we need more steps here.

Best
Yun Tang


On 12/2/19, 9:29 PM, "Salva Alcántara" <[hidden email]> wrote:

    Hi Yun,
   
    Thanks for your reply. You mention that
   
    " ‘snapshotState’ and ‘initializeState’ interfaces are used mainly to
    snapshot and initialize for operator state"
   
    but..."mainly" is not "exclusively" right? So, I guess my question tries to
    figure out whether doing something like this is valid/makes sense?
   
    ```
      override def snapshotState(context: FunctionSnapshotContext): Unit = {
        if (models.nonEmpty) {
          modelsBytes.clear()
          for ((k, model) <- models) {
            modelsBytes.put(k, model.toBytes(v))
          }
        }
      }
    ```
   
    Indeed, the above code seems to work well ... so it seems like a bug that
    `clear` works sometimes but sometimes not as I noted in my reply to Congxian
    and others have noted in this extended question posted in stackoverflow:
   
    https://stackoverflow.com/questions/59123188/state-handling-on-keyedcoprocessfunction-serving-ml-models
   
   
   
    --
    Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
   

Reply | Threaded
Open this post in threaded view
|

Re: Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction, valid or not?

Salva Alcántara
Reply | Threaded
Open this post in threaded view
|

Re: Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction, valid or not?

Salva Alcántara
In reply to this post by Yun Tang
Hi Yun,

In the end, I left the code like this

```
override def snapshotState(context: FunctionSnapshotContext): Unit = {
    for ((k, model) <- models) {
      modelsBytes.put(k, model.toBytes(v))
    }
}
```

I have verified with a simple test that most of the times checkpoints seem
to work fine. However, from time to time, the map state is not saved
properly (getting an empty map state). So, looks like updating the keyed
state like that within the `snapshotState` method is conceptually wrong,
indeed this method does not receive any keyed context to start with. Because
of this, I think the user should not even be allowed to invoke `put` (`nor`
clear) on the map state object. That would help making things less
confusing.

The reason why I am trying to serialize my (keyed state) models inside
`snaphsotState` is because these models are self-evolving and possess their
own (time-varying) state, otherwise I could just serialize them once after
creation on `processElement1` method. So, given this situation, how could I
handle my use case? Ideally, I should only serialize them when checkpoints
are taken, in particular I want to avoid having to serialize them after
every element received in `processElement2` (the state of my models change
with each new element processed here).  Maybe I cannot achieve my goals with
keyed state and need operator state instead.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction, valid or not?

Yun Tang
HI Salva

Sorry for missing your recent reply.
If you just want to make the models could be recoverable, you should choose operator state to store the "models". If you stick to the keyed state, I cannot see why these models are related to current processing key. As you can see, the "models" is just a HashMap[(String, String), Model], and I don't know why we need to couple all models to just one specific key.

Best
Yun Tang

From: Salva Alcántara <[hidden email]>
Sent: Sunday, April 5, 2020 20:22
To: [hidden email] <[hidden email]>
Subject: Re: Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction, valid or not?
 
Hi Yun,

In the end, I left the code like this

```
override def snapshotState(context: FunctionSnapshotContext): Unit = {
    for ((k, model) <- models) {
      modelsBytes.put(k, model.toBytes(v))
    }
}
```

I have verified with a simple test that most of the times checkpoints seem
to work fine. However, from time to time, the map state is not saved
properly (getting an empty map state). So, looks like updating the keyed
state like that within the `snapshotState` method is conceptually wrong,
indeed this method does not receive any keyed context to start with. Because
of this, I think the user should not even be allowed to invoke `put` (`nor`
clear) on the map state object. That would help making things less
confusing.

The reason why I am trying to serialize my (keyed state) models inside
`snaphsotState` is because these models are self-evolving and possess their
own (time-varying) state, otherwise I could just serialize them once after
creation on `processElement1` method. So, given this situation, how could I
handle my use case? Ideally, I should only serialize them when checkpoints
are taken, in particular I want to avoid having to serialize them after
every element received in `processElement2` (the state of my models change
with each new element processed here).  Maybe I cannot achieve my goals with
keyed state and need operator state instead.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction, valid or not?

Salva Alcántara
Hi Yun, thanks for your reply. I agree with you, I will switch to operator
state.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/