State Processor API: StateMigrationException for keyed state

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

State Processor API: StateMigrationException for keyed state

pwestermann
I am trying to get the new State Processor API but I am having trouble with
keyed state (this is for Flink 1.9.1 with RocksDB on S3 as the backend).
I can read keyed state for simple key type such as Strings but whenever I
tried to read state with a more complex key type - such as a named tuple
type (for example ), I get a StateMigrationException:



Any idea what could be wrong?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: State Processor API: StateMigrationException for keyed state

vino yang
Hi pwestermann,

Can you share the relevant detailed exception message?

Best,
Vino

pwestermann <[hidden email]> 于2019年12月13日周五 上午2:00写道:
I am trying to get the new State Processor API but I am having trouble with
keyed state (this is for Flink 1.9.1 with RocksDB on S3 as the backend).
I can read keyed state for simple key type such as Strings but whenever I
tried to read state with a more complex key type - such as a named tuple
type (for example ), I get a StateMigrationException:



Any idea what could be wrong?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: State Processor API: StateMigrationException for keyed state

pwestermann

Sorry I posted it but I guess it got dropped when the message was formatted.  Here’s another attempt:
2019-12-12 12:50:08

java.io.IOException: Failed to restore state backend

    at org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:231)

    at org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:177)

    at org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:79)

    at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)

    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)

    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)

    at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.Exception: Exception while creating StreamOperatorStateContext.

    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)

    at org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:223)

    ... 6 more

Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for ebe1b56bc6601c8bccba93887bec8059_ebe1b56bc6601c8bccba93887bec8059_(1/1) from any of the 1 provided restore options.

    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)

    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)

    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)

    ... 7 more

Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.

    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:326)

    at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:520)

    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)

    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)

    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)

    ... 9 more

Caused by: org.apache.flink.util.StateMigrationException: The new key serializer must be compatible.

    at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:194)

    at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:170)

    at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157)

    at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:141)

    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)

    ... 13 more

 

 

From: vino yang <[hidden email]>
Date: Thursday, December 12, 2019 at 8:46 PM
To: Peter Westermann <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: State Processor API: StateMigrationException for keyed state

 

Hi pwestermann,

 

Can you share the relevant detailed exception message?

 

Best,

Vino

 

pwestermann <[hidden email]> 20191213日周五 上午2:00写道:

I am trying to get the new State Processor API but I am having trouble with
keyed state (this is for Flink 1.9.1 with RocksDB on S3 as the backend).
I can read keyed state for simple key type such as Strings but whenever I
tried to read state with a more complex key type - such as a named tuple
type (for example ), I get a StateMigrationException:



Any idea what could be wrong?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/