StateMigrationException when switching from TypeInformation.of to createTypeInformation

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

StateMigrationException when switching from TypeInformation.of to createTypeInformation

Elias Levy
During some refactoring we changed a job using managed state from:

ListStateDescriptor("config", TypeInformation.of(new TypeHint[ConfigState]() {}))

to

ListStateDescriptor("config", createTypeInformation[ConfigState])

After this change, Flink refused to start the new job from a savepoint or checkpoint, raising StateMigrationException instead.

Why is Flink raising this error?  Both TypeInformation.of and createTypeInformation return TypeInformation[ConfigState], so why does it think the state type has changed?
Reply | Threaded
Open this post in threaded view
|

Re: StateMigrationException when switching from TypeInformation.of to createTypeInformation

Till Rohrmann
Hi Elias,

could you check whether the `TypeInformation` returned by `TypeInformation.of(new TypeHint[ConfigState]() {}))` and `createTypeInformation[ConfigState]` return the same `TypeInformation` subtype? The problem is that the former goes through the Java TypeExtractor whereas the latter goes through the Scala `TypeUtils#createTypeInfo` where the resulting `TypeInformation` is created via Scala macros. It must be the case that the Scala `TypeUtils` generate a different `TypeInformation` (e.g. Java generating a GenericTypeInfo whereas Scala generates a TraversableTypeInfo).

It would also be helpful if you could share the definition of `ConfigState` in order to test it ourselves.

Cheers,
Till

On Fri, Jul 6, 2018 at 11:31 PM Elias Levy <[hidden email]> wrote:
During some refactoring we changed a job using managed state from:

ListStateDescriptor("config", TypeInformation.of(new TypeHint[ConfigState]() {}))

to

ListStateDescriptor("config", createTypeInformation[ConfigState])

After this change, Flink refused to start the new job from a savepoint or checkpoint, raising StateMigrationException instead.

Why is Flink raising this error?  Both TypeInformation.of and createTypeInformation return TypeInformation[ConfigState], so why does it think the state type has changed?
Reply | Threaded
Open this post in threaded view
|

Re: StateMigrationException when switching from TypeInformation.of to createTypeInformation

Elias Levy
Apologies for the delay.  I've been traveling.

On Mon, Jul 9, 2018 at 8:44 AM Till Rohrmann <[hidden email]> wrote:
could you check whether the `TypeInformation` returned by `TypeInformation.of(new TypeHint[ConfigState]() {}))` and `createTypeInformation[ConfigState]` return the same `TypeInformation` subtype? The problem is that the former goes through the Java TypeExtractor whereas the latter goes through the Scala `TypeUtils#createTypeInfo` where the resulting `TypeInformation` is created via Scala macros. It must be the case that the Scala `TypeUtils` generate a different `TypeInformation` (e.g. Java generating a GenericTypeInfo whereas Scala generates a TraversableTypeInfo).

TypeInformation.of to returns a GenericTypeInfo and toString reports it as GenericType<scala.collection.mutable.Map>.

createTypeInformation returns an anonymous class but toString reports it as interface scala.collection.mutable.Map[scala.Tuple2(_1: String, _2: scala.Tuple2(_1: GenericType<me.doubledutch.lazyjson.LazyObject>, _2: byte[]))].

Looks like you are correct about the Java version using GenericTypeInfo.  I suppose the only way around this if we wanted to move over to createTypeInformation is to release a job that supports both types and upgrade the state from one to the other, then drop support for the older state.  Yes?

It would also be helpful if you could share the definition of `ConfigState` in order to test it ourselves.

ConfigState is defined as type ConfigState = mutable.Map[String,ConfigStateValue] and ConfigStateValue is defined as type ConfigStateValue = (LazyObject,Array[Byte]).  LazyObject is from the Doubledutch LazyJSON package.
Reply | Threaded
Open this post in threaded view
|

Re: StateMigrationException when switching from TypeInformation.of to createTypeInformation

Till Rohrmann
Hi Elias,

I think introducing a new state and the deprecating the old one is currently the only way to solve this problem.

The community is currently working on supporting state evolution [1]. With this feature it should be possible to change serializers between two savepoints. Unfortunately, the feature could not be completed for Flink 1.6. But I think it will be in the master soon.


Cheers,
Till

On Sun, Jul 15, 2018 at 12:11 AM Elias Levy <[hidden email]> wrote:
Apologies for the delay.  I've been traveling.

On Mon, Jul 9, 2018 at 8:44 AM Till Rohrmann <[hidden email]> wrote:
could you check whether the `TypeInformation` returned by `TypeInformation.of(new TypeHint[ConfigState]() {}))` and `createTypeInformation[ConfigState]` return the same `TypeInformation` subtype? The problem is that the former goes through the Java TypeExtractor whereas the latter goes through the Scala `TypeUtils#createTypeInfo` where the resulting `TypeInformation` is created via Scala macros. It must be the case that the Scala `TypeUtils` generate a different `TypeInformation` (e.g. Java generating a GenericTypeInfo whereas Scala generates a TraversableTypeInfo).

TypeInformation.of to returns a GenericTypeInfo and toString reports it as GenericType<scala.collection.mutable.Map>.

createTypeInformation returns an anonymous class but toString reports it as interface scala.collection.mutable.Map[scala.Tuple2(_1: String, _2: scala.Tuple2(_1: GenericType<me.doubledutch.lazyjson.LazyObject>, _2: byte[]))].

Looks like you are correct about the Java version using GenericTypeInfo.  I suppose the only way around this if we wanted to move over to createTypeInformation is to release a job that supports both types and upgrade the state from one to the other, then drop support for the older state.  Yes?

It would also be helpful if you could share the definition of `ConfigState` in order to test it ourselves.

ConfigState is defined as type ConfigState = mutable.Map[String,ConfigStateValue] and ConfigStateValue is defined as type ConfigStateValue = (LazyObject,Array[Byte]).  LazyObject is from the Doubledutch LazyJSON package.