http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/How-to-avoid-breaking-states-when-upgrading-Flink-job-tp7726p7787.html
state and not the partitioned RocksDB state. If you have implemented
pinpoint the anonymous class. Is it possible to share the job code?
> Ah, this might be in code that runs at a different layer from the
> StateBackend. Can you maybe pinpoint which of your user classes is this
> anonymous class and where it is used? Maybe by replacing them by
> non-anonymous classes and checking which replacement fixes the problem.
>
> -
> Aljoscha
>
> On Fri, 1 Jul 2016 at 16:27 Josh <
[hidden email]> wrote:
>>
>> I've just double checked and I do still get the ClassNotFound error for an
>> anonymous class, on a job which uses the RocksDBStateBackend.
>>
>> In case it helps, this was the full stack trace:
>>
>> java.lang.RuntimeException: Failed to deserialize state handle and setup
>> initial operator state.
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:551)
>> at java.lang.Thread.run(Thread.
>>
>> java:745)
>> Caused by: java.lang.ClassNotFoundException:
>> com.me.flink.job.MyJob$$anon$1$$anon$4$$anon$3
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:348)
>> at
>> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
>> at
>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620)
>> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>> at
>> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:503)
>> at
>> org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:268)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
>> at java.util.ArrayList.readObject(ArrayList.java:791)
>> at sun.reflect.GeneratedMethodAccessor51.invoke(Unknown Source)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
>> at java.util.HashMap.readObject(HashMap.java:1396)
>> at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1714)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
>> at
>> org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>
>>
>> On Fri, Jul 1, 2016 at 10:21 AM, Josh <
[hidden email]> wrote:
>>>
>>> Thanks guys, that's very helpful info!
>>>
>>> @Aljoscha I thought I saw this exception on a job that was using the
>>> RocksDB state backend, but I'm not sure. I will do some more tests today to
>>> double check. If it's still a problem I'll try the explicit class
>>> definitions solution.
>>>
>>> Josh
>>>
>>>
>>> On Thu, Jun 30, 2016 at 1:27 PM, Aljoscha Krettek <
[hidden email]>
>>> wrote:
>>>>
>>>> Also, you're using the FsStateBackend, correct?
>>>>
>>>> Reason I'm asking is that the problem should not occur for the RocksDB
>>>> state backend. There, we don't serialize any user code, only binary data. A
>>>> while back I wanted to change the FsStateBackend to also work like this. Now
>>>> might be a good time to actually do this. :-)
>>>>
>>>> On Thu, 30 Jun 2016 at 14:10 Till Rohrmann <
[hidden email]> wrote:
>>>>>
>>>>> Hi Josh,
>>>>>
>>>>> you could also try to replace your anonymous classes by explicit class
>>>>> definitions. This should assign these classes a fixed name independent of
>>>>> the other anonymous classes. Then the class loader should be able to
>>>>> deserialize your serialized data.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Thu, Jun 30, 2016 at 1:55 PM, Aljoscha Krettek <
[hidden email]>
>>>>> wrote:
>>>>>>
>>>>>> Hi Josh,
>>>>>> I think in your case the problem is that Scala might choose different
>>>>>> names for synthetic/generated classes. This will trip up the code that is
>>>>>> trying to restore from a snapshot that was done with an earlier version of
>>>>>> the code where classes where named differently.
>>>>>>
>>>>>> I'm afraid I don't know how to solve this one right now, except by
>>>>>> switching to Java.
>>>>>>
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>>
>>>>>> On Thu, 30 Jun 2016 at 13:38 Maximilian Michels <
[hidden email]>
>>>>>> wrote:
>>>>>>>
>>>>>>> Hi Josh,
>>>>>>>
>>>>>>> You have to assign UIDs to all operators to change the topology.
>>>>>>> Plus,
>>>>>>> you have to add dummy operators for all UIDs which you removed; this
>>>>>>> is a limitation currently because Flink will attempt to find all UIDs
>>>>>>> of the old job.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Max
>>>>>>>
>>>>>>> On Wed, Jun 29, 2016 at 9:00 PM, Josh <
[hidden email]> wrote:
>>>>>>> > Hi all,
>>>>>>> > Is there any information out there on how to avoid breaking saved
>>>>>>> > states/savepoints when making changes to a Flink job and
>>>>>>> > redeploying it?
>>>>>>> >
>>>>>>> > I want to know how to avoid exceptions like this:
>>>>>>> >
>>>>>>> > java.lang.RuntimeException: Failed to deserialize state handle and
>>>>>>> > setup
>>>>>>> > initial operator state.
>>>>>>> > at
>>>>>>> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:551)
>>>>>>> > at java.lang.Thread.run(Thread.java:745)
>>>>>>> > Caused by: java.lang.ClassNotFoundException:
>>>>>>> > com.me.flink.MyJob$$anon$1$$anon$7$$anon$4
>>>>>>> >
>>>>>>> >
>>>>>>> > The best information I could find in the docs is here:
>>>>>>> >
>>>>>>> >
>>>>>>> >
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html>>>>>>> >
>>>>>>> >
>>>>>>> > Having made the suggested changes to my job (i.e. giving a uid to
>>>>>>> > every
>>>>>>> > stateful sink and map function), what changes to the job/topology
>>>>>>> > are then
>>>>>>> > allowed/not allowed?
>>>>>>> >
>>>>>>> >
>>>>>>> > If I'm 'naming' my states by providing uids, why does Flink need to
>>>>>>> > look for
>>>>>>> > a specific class, like com.me.flink.MyJob$$anon$1$$anon$7$$anon$4 ?
>>>>>>> >
>>>>>>> >
>>>>>>> > Thanks for any advice,
>>>>>>> >
>>>>>>> > Josh
>>>>>
>>>>>
>>>
>>
>