Hello,
I'm currently investigating a state compatibility issue when trying to upgrade from Flink 1.9 to Flink 1.11. We are seeing "FlinkException: Could not restore keyed state backend for WindowOperator" caused by "StateMigrationException: The new key serializer must be compatible" (stack trace provided below). Our application uses the legacy planner with the Table API. I noticed work on FLINK-16998 [1] was introduced in the Flink 1.11 release which I believe to be the cause of our state issues. We assume that the issue is because Flink doesn't allow state migration for keys [2]. Is this assumption correct? Also, is there any workaround for us to recover our state? I noticed the state processor API supports reading and writing window state for Flink 1.12 [3]. Could upgrading directly to Flink 1.12 and manipulating our savepoints using the state processor API be a viable option? Here is the stack trace: java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_131] Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_c6b609a3afd99a8def9da162cdd0e7db_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144) ~[flink-dist_2.11-1.11.2.jar:1.11.2] ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:329) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:535) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144) ~[flink-dist_2.11-1.11.2.jar:1.11.2] ... 9 more Caused by: org.apache.flink.util.StateMigrationException: The new key serializer must be compatible. at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:194) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:180) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:167) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:151) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:535) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144) ~[flink-dist_2.11-1.11.2.jar:1.11.2] Thanks, Charles |
Hi Charles, I fear that your investigation is correct. I'd also assume that you have to use state processor API. Do I assume correctly that your colleague Günther also enquired about the same issue [1]? If so, let's keep the discussion there. On Tue, Dec 22, 2020 at 3:57 AM Charles Tan <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Free forum by Nabble | Edit this page |