Enabling allowNonRestoredState when resuming from a savepoint causes ClassNotFoundException

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Enabling allowNonRestoredState when resuming from a savepoint causes ClassNotFoundException

Dongwon Kim-2
Hi,

I have an original job (say v1) and I want to start a new job (say v2) from a savepoint of v1.

An operator of v1 used to have per-key states of a POJO type, but I want to remove the states together with the definition of the POJO type.

When I start v2 from a savepoint of v1, I specified "--allowNonRestoredState" but  I got the following exception:

2021-02-08 22:07:28,324 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - input-to-idata (5/8) (168520e96e73dbf6c2e3c5342b324764) switched from RUNNING to FAILED on container_e02_1607261469522_0242_01_000008 @ mobdata-flink-dn29.dakao.io (dataPort=45505).
java.lang.Exception: Exception while creating StreamOperatorStateContext.
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_222]
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for CoBroadcastWithKeyedOperator_692eb9021f6319ecb59ea7c8901de92a_(5/8) from any of the 1 provided restore options.
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:361) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	... 9 more
Caused by: java.io.IOException: Could not find class 'com.kakaomobility.drivinghabit.stream.IDataConverter$SpeedAndLoc' in classpath.
	at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:756) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:731) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.readSnapshotData(PojoSerializerSnapshotData.java:214) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.createFrom(PojoSerializerSnapshotData.java:135) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot.readSnapshot(PojoSerializerSnapshot.java:129) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:178) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:171) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:174) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:145) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:77) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:237) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:184) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:211) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:173) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:142) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:284) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	... 9 more
Caused by: java.lang.ClassNotFoundException: com.kakaomobility.drivinghabit.stream.IDataConverter$SpeedAndLoc
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_222]
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_222]
	at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_222]
	at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at java.lang.Class.forName0(Native Method) ~[?:1.8.0_222]
	at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_222]
	at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:754) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:731) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.readSnapshotData(PojoSerializerSnapshotData.java:214) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.createFrom(PojoSerializerSnapshotData.java:135) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot.readSnapshot(PojoSerializerSnapshot.java:129) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:178) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:171) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:174) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:145) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:77) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:237) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:184) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:211) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:173) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:142) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:284) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ~[flink-dist_2.11-1.12.1.jar:1.12.1] 
 ... 9 more 

So I make two versions of v2:
1) v2_1 : remove only key states w/o removing the definition of the POJO type. I can manage to resume from a savepoint of v1.
2) v2_2 : remove both key states and the definition of the POJO type. I hope resuming from a savepoint of v2_1 could succeed but it fails with the same exception as above.

Q1) Why doesn't the "--allowNonRestoredState" option suppress ClassNotFoundException?
Q2) Do I have to live forever with the definition of the POJO type which is no longer necessary?

Best,

Dongwon

Reply | Threaded
Open this post in threaded view
|

Re: Enabling allowNonRestoredState when resuming from a savepoint causes ClassNotFoundException

Dongwon Kim-2
Hi 张静,

    Q1: By default, a savepoint restore will try to match all state
back to the restored job. `AllowNonRestoredState` cannot avoid
recovery all state from savepoint, but only skip match all of the
restore state back to the restored job. So `ClassNotFoundException `
could not be avoid.
okay

   Q2: Not really. After you recover new job from the savepoint
(savepoint1)based on (1), you could do a new savepoint (savepoint2),
then remove the definition of the POJO type. then you can restore from
savepoint2.
I did it but it ends up with the same ClassNotFoundException :-(

What I did exactly are
(1) Trigger sp1 from v1 
(2) Start v2-1 (w/ the definition of the POJO but do not use it at all) from sp1 
(3) Trigger sp2 from v2-1
(4) Start v2-2 (w/o the definition of the POJO)  from sp2
(5) v2-2 failed with the same ClassNotFoundException regarding the POJO type

Should v2-2 successfully start from sp2? 

Best,

Dongwon






On Mon, Feb 8, 2021 at 11:48 PM 张静 <[hidden email]> wrote:
Hi, Dongwon,
     Q1: By default, a savepoint restore will try to match all state
back to the restored job. `AllowNonRestoredState` cannot avoid
recovery all state from savepoint, but only skip match all of the
restore state back to the restored job. So `ClassNotFoundException `
could not be avoid.
     Q2: Not really. After you recover new job from the savepoint
(savepoint1)based on (1), you could do a new savepoint (savepoint2),
then remove the definition of the POJO type. then you can restore from
savepoint2.
Correct me please if I'm wrong. Thanks.

Best,
Beyond1920

Dongwon Kim <[hidden email]> 于2021年2月8日周一 下午9:43写道:
>
> Hi,
>
> I have an original job (say v1) and I want to start a new job (say v2) from a savepoint of v1.
>
> An operator of v1 used to have per-key states of a POJO type, but I want to remove the states together with the definition of the POJO type.
>
> When I start v2 from a savepoint of v1, I specified "--allowNonRestoredState" but  I got the following exception:
>
> 2021-02-08 22:07:28,324 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - input-to-idata (5/8) (168520e96e73dbf6c2e3c5342b324764) switched from RUNNING to FAILED on container_e02_1607261469522_0242_01_000008 @ mobdata-flink-dn29.dakao.io (dataPort=45505).
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_222]
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for CoBroadcastWithKeyedOperator_692eb9021f6319ecb59ea7c8901de92a_(5/8) from any of the 1 provided restore options.
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> ... 9 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:361) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> ... 9 more
> Caused by: java.io.IOException: Could not find class 'com.kakaomobility.drivinghabit.stream.IDataConverter$SpeedAndLoc' in classpath.
> at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:756) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:731) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.readSnapshotData(PojoSerializerSnapshotData.java:214) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.createFrom(PojoSerializerSnapshotData.java:135) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot.readSnapshot(PojoSerializerSnapshot.java:129) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:178) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:171) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:174) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:145) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:77) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:237) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:184) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:211) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:173) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:142) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:284) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> ... 9 more
> Caused by: java.lang.ClassNotFoundException: com.kakaomobility.drivinghabit.stream.IDataConverter$SpeedAndLoc
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_222]
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_222]
> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_222]
> at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at java.lang.Class.forName0(Native Method) ~[?:1.8.0_222]
> at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_222]
> at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:754) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:731) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.readSnapshotData(PojoSerializerSnapshotData.java:214) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.createFrom(PojoSerializerSnapshotData.java:135) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot.readSnapshot(PojoSerializerSnapshot.java:129) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:178) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:171) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:174) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:145) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:77) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:237) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:184) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:211) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:173) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:142) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:284) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>
>  ... 9 more
>
>
> So I make two versions of v2:
> 1) v2_1 : remove only key states w/o removing the definition of the POJO type. I can manage to resume from a savepoint of v1.
> 2) v2_2 : remove both key states and the definition of the POJO type. I hope resuming from a savepoint of v2_1 could succeed but it fails with the same exception as above.
>
> Q1) Why doesn't the "--allowNonRestoredState" option suppress ClassNotFoundException?
> Q2) Do I have to live forever with the definition of the POJO type which is no longer necessary?
>
> Best,
>
> Dongwon
>
Reply | Threaded
Open this post in threaded view
|

Re: Enabling allowNonRestoredState when resuming from a savepoint causes ClassNotFoundException

r_khachatryan
Hi,

I'm pulling Yun Tang who is familiar with StateBackends and RocksDB in particular.

From what I see, the 2nd snapshot (sp2) is built using the same set of states obtained from the starting savepoint/checkpoint (sp1) to write its metadata. This metadata includes serializers snapshots, including PojoSerializer for your custom type. On restore, this metadata is read, and POJO class itself is loaded.

I see the following ways to overcome this issue:
1. Use the State Processor API to create a new snapshot [1]
2. If the operator has only this state then changing uid (together with allowNonRestoredState) should help
3. Probably just changing POJO to an empty class will suffice in your case?


Regards,
Roman


On Mon, Feb 8, 2021 at 4:31 PM Dongwon Kim <[hidden email]> wrote:
Hi 张静,

    Q1: By default, a savepoint restore will try to match all state
back to the restored job. `AllowNonRestoredState` cannot avoid
recovery all state from savepoint, but only skip match all of the
restore state back to the restored job. So `ClassNotFoundException `
could not be avoid.
okay

   Q2: Not really. After you recover new job from the savepoint
(savepoint1)based on (1), you could do a new savepoint (savepoint2),
then remove the definition of the POJO type. then you can restore from
savepoint2.
I did it but it ends up with the same ClassNotFoundException :-(

What I did exactly are
(1) Trigger sp1 from v1 
(2) Start v2-1 (w/ the definition of the POJO but do not use it at all) from sp1 
(3) Trigger sp2 from v2-1
(4) Start v2-2 (w/o the definition of the POJO)  from sp2
(5) v2-2 failed with the same ClassNotFoundException regarding the POJO type

Should v2-2 successfully start from sp2? 

Best,

Dongwon






On Mon, Feb 8, 2021 at 11:48 PM 张静 <[hidden email]> wrote:
Hi, Dongwon,
     Q1: By default, a savepoint restore will try to match all state
back to the restored job. `AllowNonRestoredState` cannot avoid
recovery all state from savepoint, but only skip match all of the
restore state back to the restored job. So `ClassNotFoundException `
could not be avoid.
     Q2: Not really. After you recover new job from the savepoint
(savepoint1)based on (1), you could do a new savepoint (savepoint2),
then remove the definition of the POJO type. then you can restore from
savepoint2.
Correct me please if I'm wrong. Thanks.

Best,
Beyond1920

Dongwon Kim <[hidden email]> 于2021年2月8日周一 下午9:43写道:
>
> Hi,
>
> I have an original job (say v1) and I want to start a new job (say v2) from a savepoint of v1.
>
> An operator of v1 used to have per-key states of a POJO type, but I want to remove the states together with the definition of the POJO type.
>
> When I start v2 from a savepoint of v1, I specified "--allowNonRestoredState" but  I got the following exception:
>
> 2021-02-08 22:07:28,324 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - input-to-idata (5/8) (168520e96e73dbf6c2e3c5342b324764) switched from RUNNING to FAILED on container_e02_1607261469522_0242_01_000008 @ mobdata-flink-dn29.dakao.io (dataPort=45505).
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_222]
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for CoBroadcastWithKeyedOperator_692eb9021f6319ecb59ea7c8901de92a_(5/8) from any of the 1 provided restore options.
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> ... 9 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:361) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> ... 9 more
> Caused by: java.io.IOException: Could not find class 'com.kakaomobility.drivinghabit.stream.IDataConverter$SpeedAndLoc' in classpath.
> at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:756) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:731) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.readSnapshotData(PojoSerializerSnapshotData.java:214) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.createFrom(PojoSerializerSnapshotData.java:135) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot.readSnapshot(PojoSerializerSnapshot.java:129) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:178) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:171) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:174) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:145) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:77) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:237) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:184) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:211) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:173) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:142) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:284) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> ... 9 more
> Caused by: java.lang.ClassNotFoundException: com.kakaomobility.drivinghabit.stream.IDataConverter$SpeedAndLoc
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_222]
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_222]
> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_222]
> at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at java.lang.Class.forName0(Native Method) ~[?:1.8.0_222]
> at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_222]
> at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:754) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:731) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.readSnapshotData(PojoSerializerSnapshotData.java:214) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.createFrom(PojoSerializerSnapshotData.java:135) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot.readSnapshot(PojoSerializerSnapshot.java:129) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:178) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:171) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:174) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:145) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:77) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:237) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:184) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:211) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:173) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:142) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:284) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>
>  ... 9 more
>
>
> So I make two versions of v2:
> 1) v2_1 : remove only key states w/o removing the definition of the POJO type. I can manage to resume from a savepoint of v1.
> 2) v2_2 : remove both key states and the definition of the POJO type. I hope resuming from a savepoint of v2_1 could succeed but it fails with the same exception as above.
>
> Q1) Why doesn't the "--allowNonRestoredState" option suppress ClassNotFoundException?
> Q2) Do I have to live forever with the definition of the POJO type which is no longer necessary?
>
> Best,
>
> Dongwon
>
Reply | Threaded
Open this post in threaded view
|

Re: Enabling allowNonRestoredState when resuming from a savepoint causes ClassNotFoundException

Dongwon Kim-2
Hi Khachatryan,

Thanks for the explanation and the input!

1. Use the State Processor API to create a new snapshot [1]
I haven't used it. but does the API prevent the class of a specific serializer from being loaded?

2. If the operator has only this state then changing uid (together with allowNonRestoredState) should help
Very unfortunately, I have another per-key state defined on the operator which is very important and cannot be abandoned T.T 

3. Probably just changing POJO to an empty class will suffice in your case?
Yeah, I might be bringing the class definition for a while.

Best,

Dongwon


On Tue, Feb 9, 2021 at 2:35 AM Khachatryan Roman <[hidden email]> wrote:
Hi,

I'm pulling Yun Tang who is familiar with StateBackends and RocksDB in particular.

From what I see, the 2nd snapshot (sp2) is built using the same set of states obtained from the starting savepoint/checkpoint (sp1) to write its metadata. This metadata includes serializers snapshots, including PojoSerializer for your custom type. On restore, this metadata is read, and POJO class itself is loaded.

I see the following ways to overcome this issue:
1. Use the State Processor API to create a new snapshot [1]
2. If the operator has only this state then changing uid (together with allowNonRestoredState) should help
3. Probably just changing POJO to an empty class will suffice in your case?


Regards,
Roman


On Mon, Feb 8, 2021 at 4:31 PM Dongwon Kim <[hidden email]> wrote:
Hi 张静,

    Q1: By default, a savepoint restore will try to match all state
back to the restored job. `AllowNonRestoredState` cannot avoid
recovery all state from savepoint, but only skip match all of the
restore state back to the restored job. So `ClassNotFoundException `
could not be avoid.
okay

   Q2: Not really. After you recover new job from the savepoint
(savepoint1)based on (1), you could do a new savepoint (savepoint2),
then remove the definition of the POJO type. then you can restore from
savepoint2.
I did it but it ends up with the same ClassNotFoundException :-(

What I did exactly are
(1) Trigger sp1 from v1 
(2) Start v2-1 (w/ the definition of the POJO but do not use it at all) from sp1 
(3) Trigger sp2 from v2-1
(4) Start v2-2 (w/o the definition of the POJO)  from sp2
(5) v2-2 failed with the same ClassNotFoundException regarding the POJO type

Should v2-2 successfully start from sp2? 

Best,

Dongwon






On Mon, Feb 8, 2021 at 11:48 PM 张静 <[hidden email]> wrote:
Hi, Dongwon,
     Q1: By default, a savepoint restore will try to match all state
back to the restored job. `AllowNonRestoredState` cannot avoid
recovery all state from savepoint, but only skip match all of the
restore state back to the restored job. So `ClassNotFoundException `
could not be avoid.
     Q2: Not really. After you recover new job from the savepoint
(savepoint1)based on (1), you could do a new savepoint (savepoint2),
then remove the definition of the POJO type. then you can restore from
savepoint2.
Correct me please if I'm wrong. Thanks.

Best,
Beyond1920

Dongwon Kim <[hidden email]> 于2021年2月8日周一 下午9:43写道:
>
> Hi,
>
> I have an original job (say v1) and I want to start a new job (say v2) from a savepoint of v1.
>
> An operator of v1 used to have per-key states of a POJO type, but I want to remove the states together with the definition of the POJO type.
>
> When I start v2 from a savepoint of v1, I specified "--allowNonRestoredState" but  I got the following exception:
>
> 2021-02-08 22:07:28,324 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - input-to-idata (5/8) (168520e96e73dbf6c2e3c5342b324764) switched from RUNNING to FAILED on container_e02_1607261469522_0242_01_000008 @ mobdata-flink-dn29.dakao.io (dataPort=45505).
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_222]
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for CoBroadcastWithKeyedOperator_692eb9021f6319ecb59ea7c8901de92a_(5/8) from any of the 1 provided restore options.
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> ... 9 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:361) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> ... 9 more
> Caused by: java.io.IOException: Could not find class 'com.kakaomobility.drivinghabit.stream.IDataConverter$SpeedAndLoc' in classpath.
> at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:756) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:731) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.readSnapshotData(PojoSerializerSnapshotData.java:214) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.createFrom(PojoSerializerSnapshotData.java:135) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot.readSnapshot(PojoSerializerSnapshot.java:129) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:178) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:171) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:174) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:145) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:77) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:237) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:184) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:211) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:173) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:142) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:284) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> ... 9 more
> Caused by: java.lang.ClassNotFoundException: com.kakaomobility.drivinghabit.stream.IDataConverter$SpeedAndLoc
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_222]
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_222]
> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_222]
> at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at java.lang.Class.forName0(Native Method) ~[?:1.8.0_222]
> at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_222]
> at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:754) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:731) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.readSnapshotData(PojoSerializerSnapshotData.java:214) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.createFrom(PojoSerializerSnapshotData.java:135) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot.readSnapshot(PojoSerializerSnapshot.java:129) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:178) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:171) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:174) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:145) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:77) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:237) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:184) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:211) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:173) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:142) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:284) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>
>  ... 9 more
>
>
> So I make two versions of v2:
> 1) v2_1 : remove only key states w/o removing the definition of the POJO type. I can manage to resume from a savepoint of v1.
> 2) v2_2 : remove both key states and the definition of the POJO type. I hope resuming from a savepoint of v2_1 could succeed but it fails with the same exception as above.
>
> Q1) Why doesn't the "--allowNonRestoredState" option suppress ClassNotFoundException?
> Q2) Do I have to live forever with the definition of the POJO type which is no longer necessary?
>
> Best,
>
> Dongwon
>
Reply | Threaded
Open this post in threaded view
|

Re: Enabling allowNonRestoredState when resuming from a savepoint causes ClassNotFoundException

Roman Khachatryan
Hi Dongwon,

With State Processor API you should be able to create a new snapshot that doesn't reference the unused classes.

Regards,
Roman


On Tue, Feb 9, 2021 at 3:39 AM Dongwon Kim <[hidden email]> wrote:
Hi Khachatryan,

Thanks for the explanation and the input!

1. Use the State Processor API to create a new snapshot [1]
I haven't used it. but does the API prevent the class of a specific serializer from being loaded?

2. If the operator has only this state then changing uid (together with allowNonRestoredState) should help
Very unfortunately, I have another per-key state defined on the operator which is very important and cannot be abandoned T.T 

3. Probably just changing POJO to an empty class will suffice in your case?
Yeah, I might be bringing the class definition for a while.

Best,

Dongwon


On Tue, Feb 9, 2021 at 2:35 AM Khachatryan Roman <[hidden email]> wrote:
Hi,

I'm pulling Yun Tang who is familiar with StateBackends and RocksDB in particular.

From what I see, the 2nd snapshot (sp2) is built using the same set of states obtained from the starting savepoint/checkpoint (sp1) to write its metadata. This metadata includes serializers snapshots, including PojoSerializer for your custom type. On restore, this metadata is read, and POJO class itself is loaded.

I see the following ways to overcome this issue:
1. Use the State Processor API to create a new snapshot [1]
2. If the operator has only this state then changing uid (together with allowNonRestoredState) should help
3. Probably just changing POJO to an empty class will suffice in your case?


Regards,
Roman


On Mon, Feb 8, 2021 at 4:31 PM Dongwon Kim <[hidden email]> wrote:
Hi 张静,

    Q1: By default, a savepoint restore will try to match all state
back to the restored job. `AllowNonRestoredState` cannot avoid
recovery all state from savepoint, but only skip match all of the
restore state back to the restored job. So `ClassNotFoundException `
could not be avoid.
okay

   Q2: Not really. After you recover new job from the savepoint
(savepoint1)based on (1), you could do a new savepoint (savepoint2),
then remove the definition of the POJO type. then you can restore from
savepoint2.
I did it but it ends up with the same ClassNotFoundException :-(

What I did exactly are
(1) Trigger sp1 from v1 
(2) Start v2-1 (w/ the definition of the POJO but do not use it at all) from sp1 
(3) Trigger sp2 from v2-1
(4) Start v2-2 (w/o the definition of the POJO)  from sp2
(5) v2-2 failed with the same ClassNotFoundException regarding the POJO type

Should v2-2 successfully start from sp2? 

Best,

Dongwon






On Mon, Feb 8, 2021 at 11:48 PM 张静 <[hidden email]> wrote:
Hi, Dongwon,
     Q1: By default, a savepoint restore will try to match all state
back to the restored job. `AllowNonRestoredState` cannot avoid
recovery all state from savepoint, but only skip match all of the
restore state back to the restored job. So `ClassNotFoundException `
could not be avoid.
     Q2: Not really. After you recover new job from the savepoint
(savepoint1)based on (1), you could do a new savepoint (savepoint2),
then remove the definition of the POJO type. then you can restore from
savepoint2.
Correct me please if I'm wrong. Thanks.

Best,
Beyond1920

Dongwon Kim <[hidden email]> 于2021年2月8日周一 下午9:43写道:
>
> Hi,
>
> I have an original job (say v1) and I want to start a new job (say v2) from a savepoint of v1.
>
> An operator of v1 used to have per-key states of a POJO type, but I want to remove the states together with the definition of the POJO type.
>
> When I start v2 from a savepoint of v1, I specified "--allowNonRestoredState" but  I got the following exception:
>
> 2021-02-08 22:07:28,324 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - input-to-idata (5/8) (168520e96e73dbf6c2e3c5342b324764) switched from RUNNING to FAILED on container_e02_1607261469522_0242_01_000008 @ mobdata-flink-dn29.dakao.io (dataPort=45505).
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_222]
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for CoBroadcastWithKeyedOperator_692eb9021f6319ecb59ea7c8901de92a_(5/8) from any of the 1 provided restore options.
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> ... 9 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:361) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> ... 9 more
> Caused by: java.io.IOException: Could not find class 'com.kakaomobility.drivinghabit.stream.IDataConverter$SpeedAndLoc' in classpath.
> at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:756) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:731) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.readSnapshotData(PojoSerializerSnapshotData.java:214) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.createFrom(PojoSerializerSnapshotData.java:135) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot.readSnapshot(PojoSerializerSnapshot.java:129) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:178) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:171) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:174) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:145) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:77) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:237) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:184) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:211) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:173) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:142) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:284) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> ... 9 more
> Caused by: java.lang.ClassNotFoundException: com.kakaomobility.drivinghabit.stream.IDataConverter$SpeedAndLoc
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_222]
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_222]
> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_222]
> at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at java.lang.Class.forName0(Native Method) ~[?:1.8.0_222]
> at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_222]
> at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:754) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:731) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.readSnapshotData(PojoSerializerSnapshotData.java:214) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.createFrom(PojoSerializerSnapshotData.java:135) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot.readSnapshot(PojoSerializerSnapshot.java:129) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:178) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:171) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:174) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:145) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:77) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:237) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:184) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:211) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:173) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:142) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:284) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>
>  ... 9 more
>
>
> So I make two versions of v2:
> 1) v2_1 : remove only key states w/o removing the definition of the POJO type. I can manage to resume from a savepoint of v1.
> 2) v2_2 : remove both key states and the definition of the POJO type. I hope resuming from a savepoint of v2_1 could succeed but it fails with the same exception as above.
>
> Q1) Why doesn't the "--allowNonRestoredState" option suppress ClassNotFoundException?
> Q2) Do I have to live forever with the definition of the POJO type which is no longer necessary?
>
> Best,
>
> Dongwon
>