Login  Register

Flink upgraded from 1.10.0 to 1.12.0

classic Classic list List threaded Threaded
2 messages Options Options
Embed post
Permalink
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Flink upgraded from 1.10.0 to 1.12.0

王炳焱
When I upgraded from Flink1.10.0 to Flink1.12.0.  Unable to restore SavePoint  And prompt the following error  


2021-05-14 22:02:44,716 WARN  org.apache.flink.metrics.MetricGroup                         [] - The operator name Calc(select=[((CAST((log_info get_json_object2 _UTF-16LE'eventTime')) / 1000) FROM_UNIXTIME _UTF-16LE'yyyy-MM-dd') AS STAT_DATE, CAST((log_info get_json_object2 _UTF-16LE'eventTime')) AS EVETN_TIME, CAST((log_info get_json_object2 _UTF-16LE'data.appId')) AS APP_ID, CAST((log_info get_json_object2 _UTF-16LE'data.storeId')) AS STORE_ID, CAST((log_info get_json_object2 _UTF-16LE'data.userId')) AS USER_ID, CAST((log_info get_json_object2 _UTF-16LE'data.employeeId')) AS EMPLOYEE_ID], where=[(((log_info get_json_object2 _UTF-16LE'status') SEARCH Sarg[_UTF-16LE'pay':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND ((log_info get_json_object2 _UTF-16LE'data.itemType') SEARCH Sarg[(-∞.._UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), (_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE".._UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), (_UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (log_info get_json_object2 _UTF-16LE'eventTime') IS NOT NULL)]) exceeded the 80 characters length limit and was truncated.
2021-05-14 22:02:44,752 WARN  org.apache.flink.metrics.MetricGroup                         [] - The operator name SourceConversion(table=[default_catalog.default_database.wkb_crm_order], fields=[log_info, proctime]) exceeded the 80 characters length limit and was truncated.
2021-05-14 22:02:44,879 ERROR org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - Caught unexpected exception.
java.io.IOException: Could not find class 'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot' in classpath.
at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:722) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:152) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:269) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:565) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:299) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:316) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:155) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) [flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501) [flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) [flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.12-1.12.0.jar:1.12.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_242]
Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot
at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_242]
at java.lang.ClassLoader.loadClass(ClassLoader.java:419) ~[?:1.8.0_242]
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at java.lang.ClassLoader.loadClass(ClassLoader.java:352) ~[?:1.8.0_242]
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at java.lang.Class.forName0(Native Method) ~[?:1.8.0_242]
at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_242]
at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:719) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
... 27 more
2021-05-14 22:02:44,880 WARN  org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - Exception while restoring keyed state backend for KeyedProcessOperator_9d9aafaedca3e4d635d2a1193610351d_(1/1) from alternative (1/1), will retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:328) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:565) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:299) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:316) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:155) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) [flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501) [flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) [flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.12-1.12.0.jar:1.12.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_242]
Caused by: java.io.IOException: Could not find class 'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot' in classpath.
at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:722) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:152) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:269) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
... 16 more
Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot
at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_242]
at java.lang.ClassLoader.loadClass(ClassLoader.java:419) ~[?:1.8.0_242]
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at java.lang.ClassLoader.loadClass(ClassLoader.java:352) ~[?:1.8.0_242]
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at java.lang.Class.forName0(Native Method) ~[?:1.8.0_242]
at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_242]
at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:719) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:152) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:269) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
... 16 more




This task is an SQL task for Flink1.10.0. I stored some aggregated state in the old SavePoint. How do I restore the state after upgrading to Flink1.12.0  



 

Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Re: Flink upgraded from 1.10.0 to 1.12.0

Matthias
Hi 王炳焱,
thanks for reaching out to the Flink community and sorry for the late reply. Unfortunately, Flink SQL does not support state backwards compatibility. There is no clear pointer in the documentation that states that. I created FLINK-22799 [1] to cover this. In the mean time, you could try using the state processor API [2] to rewrite the savepoint to workaround your issue. I haven't used the API myself, yet. Hence, I cannot give direct hints on how to do it.

I hope that helps.

Best,
Matthias


On Tue, May 18, 2021 at 2:11 PM 王炳焱 <[hidden email]> wrote:
When I upgraded from Flink1.10.0 to Flink1.12.0.  Unable to restore SavePoint  And prompt the following error  


2021-05-14 22:02:44,716 WARN  org.apache.flink.metrics.MetricGroup                         [] - The operator name Calc(select=[((CAST((log_info get_json_object2 _UTF-16LE'eventTime')) / 1000) FROM_UNIXTIME _UTF-16LE'yyyy-MM-dd') AS STAT_DATE, CAST((log_info get_json_object2 _UTF-16LE'eventTime')) AS EVETN_TIME, CAST((log_info get_json_object2 _UTF-16LE'data.appId')) AS APP_ID, CAST((log_info get_json_object2 _UTF-16LE'data.storeId')) AS STORE_ID, CAST((log_info get_json_object2 _UTF-16LE'data.userId')) AS USER_ID, CAST((log_info get_json_object2 _UTF-16LE'data.employeeId')) AS EMPLOYEE_ID], where=[(((log_info get_json_object2 _UTF-16LE'status') SEARCH Sarg[_UTF-16LE'pay':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND ((log_info get_json_object2 _UTF-16LE'data.itemType') SEARCH Sarg[(-∞.._UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), (_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE".._UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), (_UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (log_info get_json_object2 _UTF-16LE'eventTime') IS NOT NULL)]) exceeded the 80 characters length limit and was truncated.
2021-05-14 22:02:44,752 WARN  org.apache.flink.metrics.MetricGroup                         [] - The operator name SourceConversion(table=[default_catalog.default_database.wkb_crm_order], fields=[log_info, proctime]) exceeded the 80 characters length limit and was truncated.
2021-05-14 22:02:44,879 ERROR org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - Caught unexpected exception.
java.io.IOException: Could not find class 'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot' in classpath.
at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:722) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:152) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:269) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:565) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:299) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:316) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:155) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) [flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501) [flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) [flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.12-1.12.0.jar:1.12.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_242]
Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot
at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_242]
at java.lang.ClassLoader.loadClass(ClassLoader.java:419) ~[?:1.8.0_242]
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at java.lang.ClassLoader.loadClass(ClassLoader.java:352) ~[?:1.8.0_242]
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at java.lang.Class.forName0(Native Method) ~[?:1.8.0_242]
at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_242]
at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:719) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
... 27 more
2021-05-14 22:02:44,880 WARN  org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - Exception while restoring keyed state backend for KeyedProcessOperator_9d9aafaedca3e4d635d2a1193610351d_(1/1) from alternative (1/1), will retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:328) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:565) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:299) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:316) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:155) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) [flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501) [flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) [flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.12-1.12.0.jar:1.12.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_242]
Caused by: java.io.IOException: Could not find class 'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot' in classpath.
at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:722) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:152) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:269) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
... 16 more
Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot
at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_242]
at java.lang.ClassLoader.loadClass(ClassLoader.java:419) ~[?:1.8.0_242]
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at java.lang.ClassLoader.loadClass(ClassLoader.java:352) ~[?:1.8.0_242]
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at java.lang.Class.forName0(Native Method) ~[?:1.8.0_242]
at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_242]
at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:719) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:152) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:269) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
... 16 more




This task is an SQL task for Flink1.10.0. I stored some aggregated state in the old SavePoint. How do I restore the state after upgrading to Flink1.12.0