On 28 January 2018 at 6:00:32 PM, jelmer ([hidden email]) wrote:
Changing the class operators are nested in can break compatibility with existing savepoints. The following piece of code demonstrates thisIf I change Operators in this file to Operators2 i will not be able to recover from a savepoint that was made when this class still had its old name.The error in the flink ui will bejava.lang.IllegalStateException: Could not initialize keyed state backend.at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)at java.lang.Thread.run(Thread.java:745)Caused by: java.lang.NullPointerExceptionat org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.<init>(RegisteredKeyedBackendStateMetaInfo.java:53)at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateBackend.java:1216)at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeyedStateBackend.java:1153)at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1139)at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)... 6 moreBut the real reason is found in the task manager logs2018-01-28 17:03:58,830 WARN org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil - Deserialization of serializer errored; replacing with null.java.io.IOException: Unloadable class for type serializer.at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:463)at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:189)at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:162)at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:282)at org.apache.flink.runtime.state.KeyedBackendStateMetaInfoSnapshotReaderWriters$KeyedBackendStateMetaInfoReaderV3.readStateMetaInfo(KeyedBackendStateMetaInfoSnapshotReaderWriters.java:200)at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:152)at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateBackend.java:1175)at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeyedStateBackend.java:1153)at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1139)at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)at java.lang.Thread.run(Thread.java:745)Caused by: java.io.InvalidClassException: failed to read class descriptorat java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1611)at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:454)... 17 moreCaused by: java.lang.ClassNotFoundException: com.ecg.foo.Main$Operators$$anon$3$$anon$1at java.net.URLClassLoader.findClass(URLClassLoader.java:381)at java.lang.ClassLoader.loadClass(ClassLoader.java:424)at java.lang.ClassLoader.loadClass(ClassLoader.java:357)at java.lang.Class.forName0(Native Method)at java.lang.Class.forName(Class.java:348)at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$FailureTolerantObjectInputStream.readClassDescriptor(TypeSerializerSerializationUtil.java:110)at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1609)... 22 moreIs there any way to make this code more robust ? Using java serialization in this way feels very brittle in the face of refactorings.
Free forum by Nabble | Edit this page |