Flink upgraded from 1.10.0 to 1.12.0

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink upgraded from 1.10.0 to 1.12.0

王炳焱
When I upgraded from Flink1.10.0 to Flink1.12.0.  Unable to restore SavePoint  And prompt the following error  


2021-05-14 22:02:44,716 WARN  org.apache.flink.metrics.MetricGroup                         [] - The operator name Calc(select=[((CAST((log_info get_json_object2 _UTF-16LE'eventTime')) / 1000) FROM_UNIXTIME _UTF-16LE'yyyy-MM-dd') AS STAT_DATE, CAST((log_info get_json_object2 _UTF-16LE'eventTime')) AS EVETN_TIME, CAST((log_info get_json_object2 _UTF-16LE'data.appId')) AS APP_ID, CAST((log_info get_json_object2 _UTF-16LE'data.storeId')) AS STORE_ID, CAST((log_info get_json_object2 _UTF-16LE'data.userId')) AS USER_ID, CAST((log_info get_json_object2 _UTF-16LE'data.employeeId')) AS EMPLOYEE_ID], where=[(((log_info get_json_object2 _UTF-16LE'status') SEARCH Sarg[_UTF-16LE'pay':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND ((log_info get_json_object2 _UTF-16LE'data.itemType') SEARCH Sarg[(-∞.._UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), (_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE".._UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), (_UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (log_info get_json_object2 _UTF-16LE'eventTime') IS NOT NULL)]) exceeded the 80 characters length limit and was truncated.
2021-05-14 22:02:44,752 WARN  org.apache.flink.metrics.MetricGroup                         [] - The operator name SourceConversion(table=[default_catalog.default_database.wkb_crm_order], fields=[log_info, proctime]) exceeded the 80 characters length limit and was truncated.
2021-05-14 22:02:44,879 ERROR org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - Caught unexpected exception.
java.io.IOException: Could not find class 'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot' in classpath.
at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:722) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:152) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:269) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:565) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:299) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:316) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:155) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) [flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501) [flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) [flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.12-1.12.0.jar:1.12.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_242]
Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot
at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_242]
at java.lang.ClassLoader.loadClass(ClassLoader.java:419) ~[?:1.8.0_242]
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at java.lang.ClassLoader.loadClass(ClassLoader.java:352) ~[?:1.8.0_242]
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at java.lang.Class.forName0(Native Method) ~[?:1.8.0_242]
at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_242]
at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:719) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
... 27 more
2021-05-14 22:02:44,880 WARN  org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - Exception while restoring keyed state backend for KeyedProcessOperator_9d9aafaedca3e4d635d2a1193610351d_(1/1) from alternative (1/1), will retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:328) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:565) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:299) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:316) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:155) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) [flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501) [flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) [flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.12-1.12.0.jar:1.12.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_242]
Caused by: java.io.IOException: Could not find class 'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot' in classpath.
at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:722) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:152) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:269) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
... 16 more
Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot
at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_242]
at java.lang.ClassLoader.loadClass(ClassLoader.java:419) ~[?:1.8.0_242]
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at java.lang.ClassLoader.loadClass(ClassLoader.java:352) ~[?:1.8.0_242]
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at java.lang.Class.forName0(Native Method) ~[?:1.8.0_242]
at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_242]
at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:719) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:152) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:269) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
... 16 more




This task is an SQL task for Flink1.10.0. I stored some aggregated state in the old SavePoint. How do I restore the state after upgrading to Flink1.12.0  



 

Reply | Threaded
Open this post in threaded view
|

Re: Flink upgraded from 1.10.0 to 1.12.0

Matthias
Hi 王炳焱,
thanks for reaching out to the Flink community and sorry for the late reply. Unfortunately, Flink SQL does not support state backwards compatibility. There is no clear pointer in the documentation that states that. I created FLINK-22799 [1] to cover this. In the mean time, you could try using the state processor API [2] to rewrite the savepoint to workaround your issue. I haven't used the API myself, yet. Hence, I cannot give direct hints on how to do it.

I hope that helps.

Best,
Matthias


On Tue, May 18, 2021 at 2:11 PM 王炳焱 <[hidden email]> wrote:
When I upgraded from Flink1.10.0 to Flink1.12.0.  Unable to restore SavePoint  And prompt the following error  


2021-05-14 22:02:44,716 WARN  org.apache.flink.metrics.MetricGroup                         [] - The operator name Calc(select=[((CAST((log_info get_json_object2 _UTF-16LE'eventTime')) / 1000) FROM_UNIXTIME _UTF-16LE'yyyy-MM-dd') AS STAT_DATE, CAST((log_info get_json_object2 _UTF-16LE'eventTime')) AS EVETN_TIME, CAST((log_info get_json_object2 _UTF-16LE'data.appId')) AS APP_ID, CAST((log_info get_json_object2 _UTF-16LE'data.storeId')) AS STORE_ID, CAST((log_info get_json_object2 _UTF-16LE'data.userId')) AS USER_ID, CAST((log_info get_json_object2 _UTF-16LE'data.employeeId')) AS EMPLOYEE_ID], where=[(((log_info get_json_object2 _UTF-16LE'status') SEARCH Sarg[_UTF-16LE'pay':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND ((log_info get_json_object2 _UTF-16LE'data.itemType') SEARCH Sarg[(-∞.._UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), (_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE".._UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), (_UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (log_info get_json_object2 _UTF-16LE'eventTime') IS NOT NULL)]) exceeded the 80 characters length limit and was truncated.
2021-05-14 22:02:44,752 WARN  org.apache.flink.metrics.MetricGroup                         [] - The operator name SourceConversion(table=[default_catalog.default_database.wkb_crm_order], fields=[log_info, proctime]) exceeded the 80 characters length limit and was truncated.
2021-05-14 22:02:44,879 ERROR org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - Caught unexpected exception.
java.io.IOException: Could not find class 'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot' in classpath.
at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:722) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:152) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:269) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:565) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:299) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:316) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:155) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) [flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501) [flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) [flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.12-1.12.0.jar:1.12.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_242]
Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot
at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_242]
at java.lang.ClassLoader.loadClass(ClassLoader.java:419) ~[?:1.8.0_242]
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at java.lang.ClassLoader.loadClass(ClassLoader.java:352) ~[?:1.8.0_242]
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at java.lang.Class.forName0(Native Method) ~[?:1.8.0_242]
at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_242]
at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:719) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
... 27 more
2021-05-14 22:02:44,880 WARN  org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - Exception while restoring keyed state backend for KeyedProcessOperator_9d9aafaedca3e4d635d2a1193610351d_(1/1) from alternative (1/1), will retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:328) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:565) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:299) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:316) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:155) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) [flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501) [flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) [flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.12-1.12.0.jar:1.12.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_242]
Caused by: java.io.IOException: Could not find class 'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot' in classpath.
at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:722) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:152) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:269) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
... 16 more
Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot
at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_242]
at java.lang.ClassLoader.loadClass(ClassLoader.java:419) ~[?:1.8.0_242]
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at java.lang.ClassLoader.loadClass(ClassLoader.java:352) ~[?:1.8.0_242]
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at java.lang.Class.forName0(Native Method) ~[?:1.8.0_242]
at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_242]
at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:719) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:152) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:269) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
... 16 more




This task is an SQL task for Flink1.10.0. I stored some aggregated state in the old SavePoint. How do I restore the state after upgrading to Flink1.12.0