State Restoration issue with flink 1.10.1

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

State Restoration issue with flink 1.10.1

ApoorvK
Hi,

Presently I have a flink application running on version 1.8.2 I have taken a
savepoint on the running app which is stored in s3 , Now I have changed my
flink version to 1.10.1 , Now when I running the new application on version
flink-1.10.1 from the savepoint taken on flink 1.8.2 it is throwing below
error:


2020-07-29 15:24:36
java.lang.RuntimeException: Error while getting state
        at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119)
        at
org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.<init>(SharedBuffer.java:72)
        at
org.apache.flink.cep.operator.CepOperator.initializeState(CepOperator.java:178)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
        at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: The new state
serializer cannot be incompatible.
        at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
        at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
        at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
        at
org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
        at
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
        at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
        at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
        at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
        at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)
        ... 11 more



Please suggest what Can I do here



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: State Restoration issue with flink 1.10.1

Yun Tang
Hi

I compared the implementation of CepOperator between Flink-1.10.1 and Flink-1.8.2, however they should behave the same as code for map state does not change much. The error you meet might be caused by the change of inputSerializer [1], could you check whether you have introduced any difference for this?

BTW, you could also try Flink-1.9.x and Flink-1.11 to see whether problem still existed.


Best
Yun Tang

From: ApoorvK <[hidden email]>
Sent: Wednesday, July 29, 2020 18:00
To: [hidden email] <[hidden email]>
Subject: State Restoration issue with flink 1.10.1
 
Hi,

Presently I have a flink application running on version 1.8.2 I have taken a
savepoint on the running app which is stored in s3 , Now I have changed my
flink version to 1.10.1 , Now when I running the new application on version
flink-1.10.1 from the savepoint taken on flink 1.8.2 it is throwing below
error:


2020-07-29 15:24:36
java.lang.RuntimeException: Error while getting state
        at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119)
        at
org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.<init>(SharedBuffer.java:72)
        at
org.apache.flink.cep.operator.CepOperator.initializeState(CepOperator.java:178)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
        at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: The new state
serializer cannot be incompatible.
        at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
        at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
        at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
        at
org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
        at
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
        at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
        at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
        at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
        at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)
        ... 11 more



Please suggest what Can I do here



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/