Hey all,
I've run into an issue with the State Processor API. To highlight the issues I've been having, I've created a reference repository that will demonstrate the issue (repository: https://github.com/segmentio/flink-state-management). The current implementation of the pipeline has left us with keyed state that we no longer need, and we don't have references some of the old keys. My plan was to: 1. create a savepoint 2. read the keys from each operator (using State Processor API) 3. filter out all the keys that are longer used 4. bootstrap a new savepoint that contains the filtered state I managed to get this working using a sample pipeline and a very basic key (a string), but when I switched the key to be something more complex (a case class of two strings), I started seeing this exception: Caused by: org.apache.flink.util.StateMigrationException: The new key serializer must be compatible. at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:194) at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:170) at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157) at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:141) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270) ... 13 more Has anyone come across this before and figured out a fix? Any help you can give would be greatly appreciated! Thanks, -- |
Hi, Just to clarify - I quickly went through the README of the project, and saw this: "This error is seen after trying to read from a savepoint that was created using the same case class as a key." So, if I understood correctly, you were attempting to use the State Processor API to access a savepoint that was written with a Scala DataStream job, correct? If that's the case, I'm afraid this would not work as of now. See [1] for a similar scenario that others had also bumped into. TL;DR is - the State Processor API currently is not guaranteed to work for snapshots that are written with Scala DataStream jobs. For now, I'll add a big warning about this to the docs. But in general, it seems like we might want to consider bumping up the priority for enabling this, as quite a few users are using the Scala DataStream API for their jobs. Just as a side comment: this repo looks like a very interesting project! Cheers, Gordon [1] https://issues.apache.org/jira/browse/FLINK-15719 On Wed, Feb 19, 2020 at 7:03 AM Mark Niehe <[hidden email]> wrote:
|
There might be a possible workaround for this, for now: Basically, the trick is to explicitly tell the State Processor API to use a specified type information to access the keyed state. You can do that with the `ExistingSavepoint#readKeyedState(String uid, KeyedStateReaderFunction function, TypeInformation<K> keyTypeInfo, TypeInformation<OUT> outTypeInfo)`. This would allow the State Processor API to bypass the Java type information extraction process (which is not compatible with how it is done in Scala DataStream right now, hence the StateMigrationException you are getting). What you'd have to do, is in your pipeline job, explicitly generate the serializer / type information using either the Scala DataStream macro `createTypeInformation` or just use a custom serializer. Then, specify to use that serializer / type info when reading keyed state with the State Processor API. Simply put: you'll be specifying explicitly what serializer to use for the keys, and tell the State Processor API to also use that serializer to access state. This is not nice, but should work for now. Would be interesting to hear how that works out for you. As mentioned above, eventually a possible ideal solution is that type information extraction should be converged for the Java / Scala DataStream APIs. Cheers, Gordon On Wed, Feb 19, 2020 at 10:20 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |