How to make savepoints more robust in the face of refactorings ?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

How to make savepoints more robust in the face of refactorings ?

jelmer
Changing the class operators are nested in can break compatibility with existing savepoints. The following piece of code demonstrates this


If I change Operators in this file to Operators2  i will not be able to recover from a savepoint that was made  when this class still had its old name.

The error in the flink ui will be 

java.lang.IllegalStateException: Could not initialize keyed state backend.
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.<init>(RegisteredKeyedBackendStateMetaInfo.java:53)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateBackend.java:1216)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeyedStateBackend.java:1153)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1139)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
... 6 more

But the real reason is found in the task manager logs


2018-01-28 17:03:58,830 WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  - Deserialization of serializer errored; replacing with null.
java.io.IOException: Unloadable class for type serializer.
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:463)
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:189)
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:162)
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:282)
at org.apache.flink.runtime.state.KeyedBackendStateMetaInfoSnapshotReaderWriters$KeyedBackendStateMetaInfoReaderV3.readStateMetaInfo(KeyedBackendStateMetaInfoSnapshotReaderWriters.java:200)
at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:152)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateBackend.java:1175)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeyedStateBackend.java:1153)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1139)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.InvalidClassException: failed to read class descriptor
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1611)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:454)
... 17 more
Caused by: java.lang.ClassNotFoundException: com.ecg.foo.Main$Operators$$anon$3$$anon$1
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$FailureTolerantObjectInputStream.readClassDescriptor(TypeSerializerSerializationUtil.java:110)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1609)
... 22 more



Is there any way to make this code more robust ? Using java serialization in this way feels very brittle in the face of refactorings. 
Reply | Threaded
Open this post in threaded view
|

Re: How to make savepoints more robust in the face of refactorings ?

Tzu-Li (Gordon) Tai
Hi,

In the Scala API, type serializers may be anonymous classes generated by Scala macros, and would therefore contain a reference to the wrapping class (i.e., your `Operators` class).
Since Flink currently serializes serializers into the savepoint to be used for deserialization on restore, and the fact that they must be present at restore time, changing the `Operators` classname would result in the previous anonymous class serializer to no longer be in the classpath and therefore fails the deserialization of the written serializer.
This is a limitation caused by how registering serializers for written state currently works in Flink.

Generally speaking, to overcome this, you would need to have the previous serializer class still around in the classpath when restoring, and can only be completely removed from user code once the migration is completed.

One thing that I’m not completely certain with yet, is where in your demonstrated code a anonymous-classed serializer is generated for some type.
From what I see, there shouldn’t be any anonymous-class serializers for the code. Is the code you provided a “simplified” version of the actual code in which you observed the restore error?

Cheers,
Gordon


On 28 January 2018 at 6:00:32 PM, jelmer ([hidden email]) wrote:

Changing the class operators are nested in can break compatibility with existing savepoints. The following piece of code demonstrates this


If I change Operators in this file to Operators2  i will not be able to recover from a savepoint that was made  when this class still had its old name.

The error in the flink ui will be 

java.lang.IllegalStateException: Could not initialize keyed state backend.
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.<init>(RegisteredKeyedBackendStateMetaInfo.java:53)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateBackend.java:1216)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeyedStateBackend.java:1153)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1139)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
... 6 more

But the real reason is found in the task manager logs


2018-01-28 17:03:58,830 WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  - Deserialization of serializer errored; replacing with null.
java.io.IOException: Unloadable class for type serializer.
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:463)
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:189)
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:162)
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:282)
at org.apache.flink.runtime.state.KeyedBackendStateMetaInfoSnapshotReaderWriters$KeyedBackendStateMetaInfoReaderV3.readStateMetaInfo(KeyedBackendStateMetaInfoSnapshotReaderWriters.java:200)
at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:152)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateBackend.java:1175)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeyedStateBackend.java:1153)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1139)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.InvalidClassException: failed to read class descriptor
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1611)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:454)
... 17 more
Caused by: java.lang.ClassNotFoundException: com.ecg.foo.Main$Operators$$anon$3$$anon$1
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$FailureTolerantObjectInputStream.readClassDescriptor(TypeSerializerSerializationUtil.java:110)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1609)
... 22 more



Is there any way to make this code more robust ? Using java serialization in this way feels very brittle in the face of refactorings. 
Reply | Threaded
Open this post in threaded view
|

Re: How to make savepoints more robust in the face of refactorings ?

jelmer
>One thing that I’m not completely certain with yet, is where in your demonstrated code a anonymous-classed serializer is generated for some type.
>From what I see, there shouldn’t be any anonymous-class serializers for the code. Is the code you provided a “simplified” version of the actual code in which you >observed the restore error?

I had the same problem, I couldn't easily figure out where that anonymous serializer is coming from. That also makes it rather difficult to package an old version of this class along with a new job in case of refactorings

While i initially observed this problem in a more complex job. The behavior can be 100% reproduced with just that example job in the gist


On 29 January 2018 at 12:33, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

In the Scala API, type serializers may be anonymous classes generated by Scala macros, and would therefore contain a reference to the wrapping class (i.e., your `Operators` class).
Since Flink currently serializes serializers into the savepoint to be used for deserialization on restore, and the fact that they must be present at restore time, changing the `Operators` classname would result in the previous anonymous class serializer to no longer be in the classpath and therefore fails the deserialization of the written serializer.
This is a limitation caused by how registering serializers for written state currently works in Flink.

Generally speaking, to overcome this, you would need to have the previous serializer class still around in the classpath when restoring, and can only be completely removed from user code once the migration is completed.

One thing that I’m not completely certain with yet, is where in your demonstrated code a anonymous-classed serializer is generated for some type.
From what I see, there shouldn’t be any anonymous-class serializers for the code. Is the code you provided a “simplified” version of the actual code in which you observed the restore error?

Cheers,
Gordon


On 28 January 2018 at 6:00:32 PM, jelmer ([hidden email]) wrote:

Changing the class operators are nested in can break compatibility with existing savepoints. The following piece of code demonstrates this


If I change Operators in this file to Operators2  i will not be able to recover from a savepoint that was made  when this class still had its old name.

The error in the flink ui will be 

java.lang.IllegalStateException: Could not initialize keyed state backend.
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.<init>(RegisteredKeyedBackendStateMetaInfo.java:53)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateBackend.java:1216)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeyedStateBackend.java:1153)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1139)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
... 6 more

But the real reason is found in the task manager logs


2018-01-28 17:03:58,830 WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  - Deserialization of serializer errored; replacing with null.
java.io.IOException: Unloadable class for type serializer.
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:463)
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:189)
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:162)
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:282)
at org.apache.flink.runtime.state.KeyedBackendStateMetaInfoSnapshotReaderWriters$KeyedBackendStateMetaInfoReaderV3.readStateMetaInfo(KeyedBackendStateMetaInfoSnapshotReaderWriters.java:200)
at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:152)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateBackend.java:1175)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeyedStateBackend.java:1153)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1139)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.InvalidClassException: failed to read class descriptor
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1611)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:454)
... 17 more
Caused by: java.lang.ClassNotFoundException: com.ecg.foo.Main$Operators$$anon$3$$anon$1
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$FailureTolerantObjectInputStream.readClassDescriptor(TypeSerializerSerializationUtil.java:110)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1609)
... 22 more



Is there any way to make this code more robust ? Using java serialization in this way feels very brittle in the face of refactorings. 

Reply | Threaded
Open this post in threaded view
|

Re: How to make savepoints more robust in the face of refactorings ?

jelmer
In reply to this post by Tzu-Li (Gordon) Tai
I looked into it a little more. The anonymous-classed serializer is being created here


So far the only strategy for making it less likely to break is defining the Typeinformation in a trait like so and mixing it into to the operators


trait Tuple2TypeInformation {
  implicit val tuple2TypeInformation: TypeInformation[(String, Int)] = createTypeInformation[(String, Int)]
}

Then the inner class thats generated will be something like Tuple2TypeInformation$$anon$2$$annon$1 instead of com.ecg.foo.Main$Operators$$anon$3$$anon$1 and as long you don't rename this Tuple2TypeInformation around everything will work.. but it feels very suboptimal. 



On 29 January 2018 at 12:33, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

In the Scala API, type serializers may be anonymous classes generated by Scala macros, and would therefore contain a reference to the wrapping class (i.e., your `Operators` class).
Since Flink currently serializes serializers into the savepoint to be used for deserialization on restore, and the fact that they must be present at restore time, changing the `Operators` classname would result in the previous anonymous class serializer to no longer be in the classpath and therefore fails the deserialization of the written serializer.
This is a limitation caused by how registering serializers for written state currently works in Flink.

Generally speaking, to overcome this, you would need to have the previous serializer class still around in the classpath when restoring, and can only be completely removed from user code once the migration is completed.

One thing that I’m not completely certain with yet, is where in your demonstrated code a anonymous-classed serializer is generated for some type.
From what I see, there shouldn’t be any anonymous-class serializers for the code. Is the code you provided a “simplified” version of the actual code in which you observed the restore error?

Cheers,
Gordon


On 28 January 2018 at 6:00:32 PM, jelmer ([hidden email]) wrote:

Changing the class operators are nested in can break compatibility with existing savepoints. The following piece of code demonstrates this


If I change Operators in this file to Operators2  i will not be able to recover from a savepoint that was made  when this class still had its old name.

The error in the flink ui will be 

java.lang.IllegalStateException: Could not initialize keyed state backend.
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.<init>(RegisteredKeyedBackendStateMetaInfo.java:53)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateBackend.java:1216)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeyedStateBackend.java:1153)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1139)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
... 6 more

But the real reason is found in the task manager logs


2018-01-28 17:03:58,830 WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  - Deserialization of serializer errored; replacing with null.
java.io.IOException: Unloadable class for type serializer.
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:463)
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:189)
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:162)
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:282)
at org.apache.flink.runtime.state.KeyedBackendStateMetaInfoSnapshotReaderWriters$KeyedBackendStateMetaInfoReaderV3.readStateMetaInfo(KeyedBackendStateMetaInfoSnapshotReaderWriters.java:200)
at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:152)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateBackend.java:1175)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeyedStateBackend.java:1153)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1139)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.InvalidClassException: failed to read class descriptor
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1611)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:454)
... 17 more
Caused by: java.lang.ClassNotFoundException: com.ecg.foo.Main$Operators$$anon$3$$anon$1
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$FailureTolerantObjectInputStream.readClassDescriptor(TypeSerializerSerializationUtil.java:110)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1609)
... 22 more



Is there any way to make this code more robust ? Using java serialization in this way feels very brittle in the face of refactorings. 

Reply | Threaded
Open this post in threaded view
|

Re: How to make savepoints more robust in the face of refactorings ?

Tzu-Li (Gordon) Tai
Ah, I see. Yes, it seems like serializers for Scala tuples are generated anonymous-classes.
Since your window operator uses reducing state, the state type would be the same as the input type of the window (which in your case is a Scala 2-tuple).

In general, using Scala collections, case classes, and tuples (treated similarly to Scala case classes) will result in anonymous class serializers.

For now, for more robustness w.r.t. savepoint migrations I would suggest to avoid using those types.
For example, if you use a Pojo as the window input type [1], Flink's PojoSerializer will be used.

Cheers,
Gordon

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/types_serialization.html#rules-for-pojo-types

On 30 January 2018 at 12:25:34 PM, jelmer ([hidden email]) wrote:

I looked into it a little more. The anonymous-classed serializer is being created here


So far the only strategy for making it less likely to break is defining the Typeinformation in a trait like so and mixing it into to the operators


trait Tuple2TypeInformation {
  implicit val tuple2TypeInformation: TypeInformation[(String, Int)] = createTypeInformation[(String, Int)]
}

Then the inner class thats generated will be something like Tuple2TypeInformation$$anon$2$$annon$1 instead of com.ecg.foo.Main$Operators$$anon$3$$anon$1 and as long you don't rename this Tuple2TypeInformation around everything will work.. but it feels very suboptimal. 



On 29 January 2018 at 12:33, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

In the Scala API, type serializers may be anonymous classes generated by Scala macros, and would therefore contain a reference to the wrapping class (i.e., your `Operators` class).
Since Flink currently serializes serializers into the savepoint to be used for deserialization on restore, and the fact that they must be present at restore time, changing the `Operators` classname would result in the previous anonymous class serializer to no longer be in the classpath and therefore fails the deserialization of the written serializer.
This is a limitation caused by how registering serializers for written state currently works in Flink.

Generally speaking, to overcome this, you would need to have the previous serializer class still around in the classpath when restoring, and can only be completely removed from user code once the migration is completed.

One thing that I’m not completely certain with yet, is where in your demonstrated code a anonymous-classed serializer is generated for some type.
From what I see, there shouldn’t be any anonymous-class serializers for the code. Is the code you provided a “simplified” version of the actual code in which you observed the restore error?

Cheers,
Gordon


On 28 January 2018 at 6:00:32 PM, jelmer ([hidden email]) wrote:

Changing the class operators are nested in can break compatibility with existing savepoints. The following piece of code demonstrates this


If I change Operators in this file to Operators2  i will not be able to recover from a savepoint that was made  when this class still had its old name.

The error in the flink ui will be 

java.lang.IllegalStateException: Could not initialize keyed state backend.
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.<init>(RegisteredKeyedBackendStateMetaInfo.java:53)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateBackend.java:1216)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeyedStateBackend.java:1153)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1139)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
... 6 more

But the real reason is found in the task manager logs


2018-01-28 17:03:58,830 WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  - Deserialization of serializer errored; replacing with null.
java.io.IOException: Unloadable class for type serializer.
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:463)
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:189)
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:162)
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:282)
at org.apache.flink.runtime.state.KeyedBackendStateMetaInfoSnapshotReaderWriters$KeyedBackendStateMetaInfoReaderV3.readStateMetaInfo(KeyedBackendStateMetaInfoSnapshotReaderWriters.java:200)
at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:152)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateBackend.java:1175)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeyedStateBackend.java:1153)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1139)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.InvalidClassException: failed to read class descriptor
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1611)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:454)
... 17 more
Caused by: java.lang.ClassNotFoundException: com.ecg.foo.Main$Operators$$anon$3$$anon$1
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$FailureTolerantObjectInputStream.readClassDescriptor(TypeSerializerSerializationUtil.java:110)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1609)
... 22 more



Is there any way to make this code more robust ? Using java serialization in this way feels very brittle in the face of refactorings.