Re: StateMigrationException when switching from TypeInformation.of to createTypeInformation
Posted by
Till Rohrmann on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/StateMigrationException-when-switching-from-TypeInformation-of-to-createTypeInformation-tp21276p21560.html
Hi Elias,
I think introducing a new state and the deprecating the old one is currently the only way to solve this problem.
The community is currently working on supporting state evolution [1]. With this feature it should be possible to change serializers between two savepoints. Unfortunately, the feature could not be completed for Flink 1.6. But I think it will be in the master soon.
Cheers,
Till
Apologies for the delay. I've been traveling.
could you check whether the `TypeInformation` returned by `TypeInformation.of(new TypeHint[ConfigState]() {}))` and `createTypeInformation[ConfigState]` return the same `TypeInformation` subtype? The problem is that the former goes through the Java TypeExtractor whereas the latter goes through the Scala `TypeUtils#createTypeInfo` where the resulting `TypeInformation` is created via Scala macros. It must be the case that the Scala `TypeUtils` generate a different `TypeInformation` (e.g. Java generating a GenericTypeInfo whereas Scala generates a TraversableTypeInfo).
TypeInformation.of to returns a GenericTypeInfo and toString reports it as GenericType<scala.collection.mutable.Map>.
createTypeInformation returns an anonymous class but toString reports it as interface scala.collection.mutable.Map[scala.Tuple2(_1: String, _2: scala.Tuple2(_1: GenericType<me.doubledutch.lazyjson.LazyObject>, _2: byte[]))].
Looks like you are correct about the Java version using GenericTypeInfo. I suppose the only way around this if we wanted to move over to createTypeInformation is to release a job that supports both types and upgrade the state from one to the other, then drop support for the older state. Yes?
It would also be helpful if you could share the definition of `ConfigState` in order to test it ourselves.
ConfigState is defined as
type ConfigState = mutable.Map[String,ConfigStateValue] and ConfigStateValue is defined as type ConfigStateValue = (LazyObject,Array[Byte]). LazyObject is from the Doubledutch LazyJSON package.