Flink 1.11 State Compatibility Issue Table API

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.11 State Compatibility Issue Table API

Charles Tan
Hello,

I'm currently investigating a state compatibility issue when trying to upgrade from Flink 1.9 to Flink 1.11. We are seeing "FlinkException: Could not restore keyed state backend for WindowOperator" caused by "StateMigrationException: The new key serializer must be compatible" (stack trace provided below). Our application uses the legacy planner with the Table API. I noticed work on FLINK-16998 [1] was introduced in the Flink 1.11 release which I believe to be the cause of our state issues. We assume that the issue is because Flink doesn't allow state migration for keys [2]. Is this assumption correct? Also, is there any workaround for us to recover our state? I noticed the state processor API supports reading and writing window state for Flink 1.12 [3]. Could upgrading directly to Flink 1.12 and manipulating our savepoints using the state processor API be a viable option?


Here is the stack trace:
java.lang.Exception: Exception while creating StreamOperatorStateContext.
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_131]
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_c6b609a3afd99a8def9da162cdd0e7db_(1/1) from any of the 1 provided restore options.
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:329) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:535) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	... 9 more
Caused by: org.apache.flink.util.StateMigrationException: The new key serializer must be compatible.
	at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:194) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:180) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:167) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:151) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:535) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

Thanks,
Charles
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 State Compatibility Issue Table API

Arvid Heise-3
Hi Charles,

I fear that your investigation is correct. I'd also assume that you have to use state processor API.

Do I assume correctly that your colleague Günther also enquired about the same issue [1]? If so, let's keep the discussion there.


On Tue, Dec 22, 2020 at 3:57 AM Charles Tan <[hidden email]> wrote:
Hello,

I'm currently investigating a state compatibility issue when trying to upgrade from Flink 1.9 to Flink 1.11. We are seeing "FlinkException: Could not restore keyed state backend for WindowOperator" caused by "StateMigrationException: The new key serializer must be compatible" (stack trace provided below). Our application uses the legacy planner with the Table API. I noticed work on FLINK-16998 [1] was introduced in the Flink 1.11 release which I believe to be the cause of our state issues. We assume that the issue is because Flink doesn't allow state migration for keys [2]. Is this assumption correct? Also, is there any workaround for us to recover our state? I noticed the state processor API supports reading and writing window state for Flink 1.12 [3]. Could upgrading directly to Flink 1.12 and manipulating our savepoints using the state processor API be a viable option?


Here is the stack trace:
java.lang.Exception: Exception while creating StreamOperatorStateContext.
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_131]
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_c6b609a3afd99a8def9da162cdd0e7db_(1/1) from any of the 1 provided restore options.
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:329) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:535) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	... 9 more
Caused by: org.apache.flink.util.StateMigrationException: The new key serializer must be compatible.
	at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:194) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:180) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:167) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:151) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:535) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

Thanks,
Charles


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng