Thanks guys, that's very helpful info!
@Aljoscha I thought I saw this exception on a job that was using the RocksDB state backend, but I'm not sure. I will do some more tests today to double check. If it's still a problem I'll try the explicit class definitions solution.
Josh
Also, you're using the FsStateBackend, correct?Reason I'm asking is that the problem should not occur for the RocksDB state backend. There, we don't serialize any user code, only binary data. A while back I wanted to change the FsStateBackend to also work like this. Now might be a good time to actually do this. :-)On Thu, 30 Jun 2016 at 14:10 Till Rohrmann <[hidden email]> wrote:Hi Josh,you could also try to replace your anonymous classes by explicit class definitions. This should assign these classes a fixed name independent of the other anonymous classes. Then the class loader should be able to deserialize your serialized data.Cheers,TillOn Thu, Jun 30, 2016 at 1:55 PM, Aljoscha Krettek <[hidden email]> wrote:Hi Josh,I think in your case the problem is that Scala might choose different names for synthetic/generated classes. This will trip up the code that is trying to restore from a snapshot that was done with an earlier version of the code where classes where named differently.I'm afraid I don't know how to solve this one right now, except by switching to Java.Cheers,AljoschaOn Thu, 30 Jun 2016 at 13:38 Maximilian Michels <[hidden email]> wrote:Hi Josh,
You have to assign UIDs to all operators to change the topology. Plus,
you have to add dummy operators for all UIDs which you removed; this
is a limitation currently because Flink will attempt to find all UIDs
of the old job.
Cheers,
Max
On Wed, Jun 29, 2016 at 9:00 PM, Josh <[hidden email]> wrote:
> Hi all,
> Is there any information out there on how to avoid breaking saved
> states/savepoints when making changes to a Flink job and redeploying it?
>
> I want to know how to avoid exceptions like this:
>
> java.lang.RuntimeException: Failed to deserialize state handle and setup
> initial operator state.
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:551)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException:
> com.me.flink.MyJob$$anon$1$$anon$7$$anon$4
>
>
> The best information I could find in the docs is here:
>
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html
>
>
> Having made the suggested changes to my job (i.e. giving a uid to every
> stateful sink and map function), what changes to the job/topology are then
> allowed/not allowed?
>
>
> If I'm 'naming' my states by providing uids, why does Flink need to look for
> a specific class, like com.me.flink.MyJob$$anon$1$$anon$7$$anon$4 ?
>
>
> Thanks for any advice,
>
> Josh
Free forum by Nabble | Edit this page |