Hi,
I have an original job (say v1) and I want to start a new job (say v2) from a savepoint of v1. An operator of v1 used to have per-key states of a POJO type, but I want to remove the states together with the definition of the POJO type. When I start v2 from a savepoint of v1, I specified "--allowNonRestoredState" but I got the following exception: 2021-02-08 22:07:28,324 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - input-to-idata (5/8) (168520e96e73dbf6c2e3c5342b324764) switched from RUNNING to FAILED on container_e02_1607261469522_0242_01_000008 @ mobdata-flink-dn29.dakao.io (dataPort=45505). java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_222] Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for CoBroadcastWithKeyedOperator_692eb9021f6319ecb59ea7c8901de92a_(5/8) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ~[flink-dist_2.11-1.12.1.jar:1.12.1] ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:361) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ~[flink-dist_2.11-1.12.1.jar:1.12.1] ... 9 more Caused by: java.io.IOException: Could not find class 'com.kakaomobility.drivinghabit.stream.IDataConverter$SpeedAndLoc' in classpath. at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:756) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:731) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.readSnapshotData(PojoSerializerSnapshotData.java:214) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.createFrom(PojoSerializerSnapshotData.java:135) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot.readSnapshot(PojoSerializerSnapshot.java:129) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:178) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:171) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:174) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:145) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:77) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:237) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:184) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:211) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:173) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:142) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:284) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ~[flink-dist_2.11-1.12.1.jar:1.12.1] ... 9 more Caused by: java.lang.ClassNotFoundException: com.kakaomobility.drivinghabit.stream.IDataConverter$SpeedAndLoc at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_222] at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_222] at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_222] at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at java.lang.Class.forName0(Native Method) ~[?:1.8.0_222] at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_222] at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:754) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:731) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.readSnapshotData(PojoSerializerSnapshotData.java:214) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.createFrom(PojoSerializerSnapshotData.java:135) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot.readSnapshot(PojoSerializerSnapshot.java:129) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:178) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:171) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:174) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:145) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:77) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:237) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:184) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:211) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:173) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:142) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:284) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ~[flink-dist_2.11-1.12.1.jar:1.12.1] So I make two versions of v2: 1) v2_1 : remove only key states w/o removing the definition of the POJO type. I can manage to resume from a savepoint of v1. 2) v2_2 : remove both key states and the definition of the POJO type. I hope resuming from a savepoint of v2_1 could succeed but it fails with the same exception as above. Q1) Why doesn't the "--allowNonRestoredState" option suppress ClassNotFoundException? Q2) Do I have to live forever with the definition of the POJO type which is no longer necessary? Best, Dongwon |
Hi 张静, Q1: By default, a savepoint restore will try to match all state okay Q2: Not really. After you recover new job from the savepoint I did it but it ends up with the same ClassNotFoundException :-( What I did exactly are (1) Trigger sp1 from v1 (2) Start v2-1 (w/ the definition of the POJO but do not use it at all) from sp1 (3) Trigger sp2 from v2-1 (4) Start v2-2 (w/o the definition of the POJO) from sp2 (5) v2-2 failed with the same ClassNotFoundException regarding the POJO type Should v2-2 successfully start from sp2? Best, Dongwon On Mon, Feb 8, 2021 at 11:48 PM 张静 <[hidden email]> wrote: Hi, Dongwon, |
Hi, I'm pulling Yun Tang who is familiar with StateBackends and RocksDB in particular. From what I see, the 2nd snapshot (sp2) is built using the same set of states obtained from the starting savepoint/checkpoint (sp1) to write its metadata. This metadata includes serializers snapshots, including PojoSerializer for your custom type. On restore, this metadata is read, and POJO class itself is loaded. I see the following ways to overcome this issue: 1. Use the State Processor API to create a new snapshot [1] 2. If the operator has only this state then changing uid (together with allowNonRestoredState) should help 3. Probably just changing POJO to an empty class will suffice in your case? Regards,
Roman On Mon, Feb 8, 2021 at 4:31 PM Dongwon Kim <[hidden email]> wrote:
|
Hi Khachatryan, Thanks for the explanation and the input! 1. Use the State Processor API to create a new snapshot [1] I haven't used it. but does the API prevent the class of a specific serializer from being loaded? 2. If the operator has only this state then changing uid (together with allowNonRestoredState) should help Very unfortunately, I have another per-key state defined on the operator which is very important and cannot be abandoned T.T 3. Probably just changing POJO to an empty class will suffice in your case? Yeah, I might be bringing the class definition for a while. Best, Dongwon On Tue, Feb 9, 2021 at 2:35 AM Khachatryan Roman <[hidden email]> wrote:
|
Hi Dongwon, With State Processor API you should be able to create a new snapshot that doesn't reference the unused classes. Regards,
Roman On Tue, Feb 9, 2021 at 3:39 AM Dongwon Kim <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |