Hello! We are working with a Scala based pipeline.
We changed case class Record(orgId: Int) To case class Record(orgId: Int, operationId:Option[String] = None) And now our savepoints fail with this exception: org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643) at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:577) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:240) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) I was under the impression we could add items to case classes and still be able to use existing state to start the job. -Steve |
The Option class is not serializable, if you put something serializable into that case class you wouldn’t have problems.
|
Are you sure? I just restarted the job with new new version, but not from a savepoint and took a new savepoint and it seemed to work from there. It just seemed like it couldn’t upgrade the schema during restore.
Sent from my iPhone
|
I’m pretty sure java.util.Optional is not serializable: https://stackoverflow.com/questions/24547673/why-java-util-optional-is-not-serializable-how-to-serialize-the-object-with-suc
However on a second look I can now see you’re using Scala’s Option, which IS serializable :) My apologies for that. So your problem was that you had previous version in your save point which of course cannot be deserialized into the new version without custom code that would handle a missing Option.
|
No problem :)
I wasn’t able to find documentation on what can and cannot be upgraded for case classes. I had assumed the same rules that applied to POJO scheme upgrading applied to case classes. Has someone put together rules for case classes? I also should have mentioned we are running 1.9 Flink. Sent from my iPhone
|
Hi Steven From the exception, seems the serializer used before and after the change is incompatible, I'm not very familiar with Scala case class, maybe you can debug it locally, which serializer used before and after the change for the case class. Best, Congxian Steven Nelson <[hidden email]> 于2019年10月9日周三 上午4:09写道:
|
Free forum by Nabble | Edit this page |