Hi, I recently had to do a code update of a long running Flink Stream job (1.3.2) and on the restart from the savepoint I had to deal with: java.lang.IllegalStateException: Could not initialize keyed state backend. Caused by: java.io.InvalidClassException: lab.vardata.events.Event; local class incompatible: stream classdesc serial VersionUID = 8728793377341765980, local class serialVersionUID = -4253404384162522764 because I have changed a method used to convert the Event to a Cassandra writable Tuple (in particular, I changed the return type from Tuple10 to Tuple11, after adding a field). I reverted those changes back since it wasn't much of a problem per se. Now, I understand the root cause of this issue and I wanted to ask if there are any "best practices" to prevent this kind of issues, without losing the state of the job, because of restarting it from the very beginning. -- Federico D'Ambrosio |
Hi Federico, It seems like the state cannot be restored because the class of the state type (i.e., Event) had been modified since the savepoint, and therefore has a conflicting serialVersionUID with whatever it is in the savepoint. This can happen if Java serialization is used for some part of your state, and the class of the written data was modified while a fixed serialVersionUID was not explicitly specified for that class. To avoid this, you should explicitly set a serialVersionUID for the Event class. You can actually also do that now without losing state while also incorporating the modifications you were trying to do for your updated job. Explicitly declare the serialVersionUID of the Event class to what is was before your modifications (i.e., 8728793377941765980, according to your error log). One side question: are you experiencing this restore failure for one of your custom operator states, or is this failing state part of some Flink built-in operator / connector? I’m asking just to have an idea of which Flink built-in operator / connectors still use Java serialization for user state; ideally we would want that to be completed removed in the future. Gordon
On 28 November 2017 at 10:02:19 PM, Federico D'Ambrosio ([hidden email]) wrote:
|
Hi Gordon, explicitly specifying the serialversionuid did the job, thank you! The failing task was latest_time -> (cassandra-map -> Sink: cassandra-active-sink, map_active_stream, map_history_stream) like the following: val events = keyedstream .window(Time.seconds(20)) .maxBy("field").name("latest-time") CassandraSink.addSink( events.map(_.toCassandraTuple).name("cassandra-map").javaStream) .setQuery(...) .setClusterBuilder(...) .build().name("cassandra-sink") with cassandra-map, map_history_stream and map_active_stream, stateless map functions So, I guess the culprit was either the window/maxBy operator or the cassandra sink. I guess the window/maxBy operator, since the initialization of a keyed state is specified. I'm attaching the complete log. Cheers, Federico 2017-11-28 15:32 GMT+01:00 Tzu-Li (Gordon) Tai <[hidden email]>:
-- Federico D'Ambrosio log.txt (4K) Download Attachment |
Free forum by Nabble | Edit this page |