State incompatible

Posted by avilevi on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/State-incompatible-tp28748.html

Hi,
I added a ttl to my state 
old version :
 private lazy val stateDescriptor = new ValueStateDescriptor("foo", Types.CASE_CLASS[DomainState])

vs the new version 

@transient
private lazy val storeTtl = StateTtlConfig.newBuilder(90)
  .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
  .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  .cleanupInRocksdbCompactFilter()
  .build()

  private lazy val stateDescriptor = {
    val des = new ValueStateDescriptor("foo", Types.CASE_CLASS[DomainState])
    des.enableTimeToLive(storeTtl)
    des
  }

BUT when trying to restore from savepoint I am getting this error:

java.lang.RuntimeException: Error while getting state
	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
	at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
	...

Caused by: org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible.
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:527)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
	at org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
	at org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:137)
	at org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
	at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
	at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
	at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335)
	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
	... 11 more

Do you have any idea how can I resolve it ? 

Best wishes