During some refactoring we changed a job using managed state from: to ListStateDescriptor("config", createTypeInformation[ConfigState]) After this change, Flink refused to start the new job from a savepoint or checkpoint, raising StateMigrationException instead. Why is Flink raising this error? Both TypeInformation.of and createTypeInformation return TypeInformation[ConfigState], so why does it think the state type has changed? |
Hi Elias, could you check whether the `TypeInformation` returned by `TypeInformation.of(new TypeHint[ConfigState]() {}))` and `createTypeInformation[ConfigState]` return the same `TypeInformation` subtype? The problem is that the former goes through the Java TypeExtractor whereas the latter goes through the Scala `TypeUtils#createTypeInfo` where the resulting `TypeInformation` is created via Scala macros. It must be the case that the Scala `TypeUtils` generate a different `TypeInformation` (e.g. Java generating a GenericTypeInfo whereas Scala generates a TraversableTypeInfo). It would also be helpful if you could share the definition of `ConfigState` in order to test it ourselves. Cheers, Till On Fri, Jul 6, 2018 at 11:31 PM Elias Levy <[hidden email]> wrote:
|
Apologies for the delay. I've been traveling.
On Mon, Jul 9, 2018 at 8:44 AM Till Rohrmann <[hidden email]> wrote:
TypeInformation.of to returns a GenericTypeInfo and toString reports it as GenericType<scala.collection.mutable.Map>. createTypeInformation returns an anonymous class but toString reports it as interface scala.collection.mutable.Map[scala.Tuple2(_1: String, _2: scala.Tuple2(_1: GenericType<me.doubledutch.lazyjson.LazyObject>, _2: byte[]))]. Looks like you are correct about the Java version using GenericTypeInfo. I suppose the only way around this if we wanted to move over to createTypeInformation is to release a job that supports both types and upgrade the state from one to the other, then drop support for the older state. Yes?
ConfigState is defined as type ConfigState = mutable.Map[String,ConfigStateValue] and ConfigStateValue is defined as type ConfigStateValue = (LazyObject,Array[Byte]). LazyObject is from the Doubledutch LazyJSON package. |
Hi Elias, I think introducing a new state and the deprecating the old one is currently the only way to solve this problem. The community is currently working on supporting state evolution [1]. With this feature it should be possible to change serializers between two savepoints. Unfortunately, the feature could not be completed for Flink 1.6. But I think it will be in the master soon. Cheers, Till On Sun, Jul 15, 2018 at 12:11 AM Elias Levy <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |