java.lang.RuntimeException: Failed to deserialize state handle and setup initial operator state. at org.apache.flink.runtime.taskmanager.Task.run(Task.java:551) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: com.me.flink.job.MyJob$$anon$1$$anon$4$$anon$3 at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:503) at org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:268) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at java.util.ArrayList.readObject(ArrayList.java:791) at sun.reflect.GeneratedMethodAccessor51.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at java.util.HashMap.readObject(HashMap.java:1396) at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1714) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
Thanks guys, that's very helpful info!
@Aljoscha I thought I saw this exception on a job that was using the RocksDB state backend, but I'm not sure. I will do some more tests today to double check. If it's still a problem I'll try the explicit class definitions solution.
Josh
On Thu, Jun 30, 2016 at 1:27 PM, Aljoscha Krettek <[hidden email]> wrote:Also, you're using the FsStateBackend, correct?Reason I'm asking is that the problem should not occur for the RocksDB state backend. There, we don't serialize any user code, only binary data. A while back I wanted to change the FsStateBackend to also work like this. Now might be a good time to actually do this. :-)On Thu, 30 Jun 2016 at 14:10 Till Rohrmann <[hidden email]> wrote:Hi Josh,you could also try to replace your anonymous classes by explicit class definitions. This should assign these classes a fixed name independent of the other anonymous classes. Then the class loader should be able to deserialize your serialized data.Cheers,TillOn Thu, Jun 30, 2016 at 1:55 PM, Aljoscha Krettek <[hidden email]> wrote:Hi Josh,I think in your case the problem is that Scala might choose different names for synthetic/generated classes. This will trip up the code that is trying to restore from a snapshot that was done with an earlier version of the code where classes where named differently.I'm afraid I don't know how to solve this one right now, except by switching to Java.Cheers,AljoschaOn Thu, 30 Jun 2016 at 13:38 Maximilian Michels <[hidden email]> wrote:Hi Josh,
You have to assign UIDs to all operators to change the topology. Plus,
you have to add dummy operators for all UIDs which you removed; this
is a limitation currently because Flink will attempt to find all UIDs
of the old job.
Cheers,
Max
On Wed, Jun 29, 2016 at 9:00 PM, Josh <[hidden email]> wrote:
> Hi all,
> Is there any information out there on how to avoid breaking saved
> states/savepoints when making changes to a Flink job and redeploying it?
>
> I want to know how to avoid exceptions like this:
>
> java.lang.RuntimeException: Failed to deserialize state handle and setup
> initial operator state.
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:551)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException:
> com.me.flink.MyJob$$anon$1$$anon$7$$anon$4
>
>
> The best information I could find in the docs is here:
>
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html
>
>
> Having made the suggested changes to my job (i.e. giving a uid to every
> stateful sink and map function), what changes to the job/topology are then
> allowed/not allowed?
>
>
> If I'm 'naming' my states by providing uids, why does Flink need to look for
> a specific class, like com.me.flink.MyJob$$anon$1$$anon$7$$anon$4 ?
>
>
> Thanks for any advice,
>
> Josh
Free forum by Nabble | Edit this page |