Adding a new field in java class -> restore fails with "KryoException: Unable to find class"

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Adding a new field in java class -> restore fails with "KryoException: Unable to find class"

Juho Autio
Is it possible to add new fields to the object type of a stream, and then restore from savepoint?

I tried to add a new field "private String" to my java class. It previously had "private String" and a "private final Map<String, String>". When trying to restore an old savepoint after this code change, it failed with "KryoException: Unable to find class".

Is it possible to evolve the stream classes and restore old state after such changes? For me it would work if the new fields are set to null when restoring state with such objects. And if a field has been deleted, restored values could be ignored.

Here's a full stack trace:

2018-03-07 08:49:03,072 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - EnrichIdFunction -> AppIdFilter([appsimulator_236e5fb7]) -> DiscardBeforeDateFunction(null) -> DiscardedEventsFunction -> (LateDataLabelFunction, EventMapper -> ThreadPoolGateway (capacity=10) -> Sink: ResponseKafkaSink) (7/8) (66b47839cefef8518605ece669709c65) switched from RUNNING to FAILED.
java.lang.IllegalStateException: Could not initialize operator state backend.
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:330)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:241)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: �mo
Serialization trace:
params (com.rovio.ds.flink.http.Event)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:203)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:48)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:552)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:368)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:737)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:328)
... 6 more
Caused by: java.lang.ClassNotFoundException: �mo
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
... 18 more

I used flink 1.5-SNAPSHOT for this.



FYI, how I solved this for now:

As a work-around for my case, I happened to have a Map<String, String> in the object that I was able to use as a bit of a hack for this. I inserted the new field into the map and removed it afterwards to clean it up. This way I was able to get null values with map.get for objects that had been restored from the old savepoint, and non-null values for any new instances.



FYI, failed attempt to do this:

Only related things I could find about such thing were (not much help, but mention the Optional annotation):


If I set @FieldSerializer.Optional("appId") on the new field, then Flink was able to restore from the old savepoint, but apparently the new field got ignored entirely, it just kept being null also for the new instances – apparently totally ignored then.

Reply | Threaded
Open this post in threaded view
|

Re: Adding a new field in java class -> restore fails with "KryoException: Unable to find class"

Stephan Ewen
Hi!

Schema evolution is a bit tricky at the moment. There is a short term and long term answer to this:

  - Long term: We store serializer configuration in the snapshots, and want to use this in the future to offer a path that converts old format to new format (read with old serializer, pass through a user-specified converter function, serialize with new serializer). Two out of three parts are in place, but it is not fully working at this point.

  - Short term: To be able to evolve state, I would recommend to use something like Avro or so, that has schema evolution built in. Kryo is unfortunately particularly bad at class/schema evolution. To use avro with your types, when creating your state, pass it a "new AvroTypeInfo<>(MyClass.class)". You need do add "flink-avro" as a dependency in your application.

Best,
Stephan


On Fri, Mar 16, 2018 at 11:19 AM, Juho Autio <[hidden email]> wrote:
Is it possible to add new fields to the object type of a stream, and then restore from savepoint?

I tried to add a new field "private String" to my java class. It previously had "private String" and a "private final Map<String, String>". When trying to restore an old savepoint after this code change, it failed with "KryoException: Unable to find class".

Is it possible to evolve the stream classes and restore old state after such changes? For me it would work if the new fields are set to null when restoring state with such objects. And if a field has been deleted, restored values could be ignored.

Here's a full stack trace:

2018-03-07 08:49:03,072 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - EnrichIdFunction -> AppIdFilter([appsimulator_236e5fb7]) -> DiscardBeforeDateFunction(null) -> DiscardedEventsFunction -> (LateDataLabelFunction, EventMapper -> ThreadPoolGateway (capacity=10) -> Sink: ResponseKafkaSink) (7/8) (66b47839cefef8518605ece669709c65) switched from RUNNING to FAILED.
java.lang.IllegalStateException: Could not initialize operator state backend.
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:330)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:241)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: �mo
Serialization trace:
params (com.rovio.ds.flink.http.Event)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:203)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:48)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:552)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:368)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:737)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:328)
... 6 more
Caused by: java.lang.ClassNotFoundException: �mo
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
... 18 more

I used flink 1.5-SNAPSHOT for this.



FYI, how I solved this for now:

As a work-around for my case, I happened to have a Map<String, String> in the object that I was able to use as a bit of a hack for this. I inserted the new field into the map and removed it afterwards to clean it up. This way I was able to get null values with map.get for objects that had been restored from the old savepoint, and non-null values for any new instances.



FYI, failed attempt to do this:

Only related things I could find about such thing were (not much help, but mention the Optional annotation):


If I set @FieldSerializer.Optional("appId") on the new field, then Flink was able to restore from the old savepoint, but apparently the new field got ignored entirely, it just kept being null also for the new instances – apparently totally ignored then.