State incompatible
Posted by
avilevi on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/State-incompatible-tp28748.html
Hi,
I added a ttl to my state
old version :
private lazy val stateDescriptor = new ValueStateDescriptor("foo", Types.CASE_CLASS[DomainState])
vs the new version
@transient
private lazy val storeTtl = StateTtlConfig.newBuilder(90)
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupInRocksdbCompactFilter()
.build()
private lazy val stateDescriptor = {
val des = new ValueStateDescriptor("foo", Types.CASE_CLASS[DomainState])
des.enableTimeToLive(storeTtl)
des
}
BUT when trying to restore from savepoint I am getting this error:
java.lang.RuntimeException: Error while getting state
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
...
Caused by: org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible.
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:527)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
at org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
at org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:137)
at org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 11 more
Do you have any idea how can I resolve it ?
Best wishes