Exception for Scala anonymous class when restoring from state

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Exception for Scala anonymous class when restoring from state

Kien Truong
Hi,

After some refactoring: moving some operator to separate functions/file, I'm encountering a lot of exceptions like these. The logic of the application did not change, and all the refactored operators are stateless, e.g simple map/flatmap/filter.

Does anyone know how to fix/avoid/work around this?

I'm using FsStateBackend on Flink 1.3.2

Regards,
Kien

java.lang.ClassNotFoundException:

x.x.X$$anon$113$$anon$55
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1826)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:305)
at org.apache.flink.streaming.api.operators.HeapInternalTimerService.restoreTimersForKeyGroup(HeapInternalTimerService.java:321)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.restoreStateForKeyGroup(InternalTimeServiceManager.java:141)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:496)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:104)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:251)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Reply | Threaded
Open this post in threaded view
|

Re: Exception for Scala anonymous class when restoring from state

Tzu-Li (Gordon) Tai
Hi Kien,

Thank you for reporting this.
Generally, this is currently a restriction with the Scala API. Type serializers are generated anonymous classes when using the Scala API, and therefore is sensitive to the ordering / enclosing class / Scala version / etc. when generating them.
We’re discussing a fix for this at the moment, I’ll get back to you once we figure that out.

As for the case you’ve bumped into here, you seem to be failing on one of the key serializers when restoring the timer service.
Have you happened to also refactored classes that are used as keys? Maybe what you could try doing, as a means to avoid that for now, is to make sure that the key classes are untouched.

Please keep us updated on how this works out for you, I’ll continue to look into it.

Thanks,
Gordon

On 17 August 2017 at 10:46:05 AM, Kien Truong ([hidden email]) wrote:

Hi,

After some refactoring: moving some operator to separate functions/file, I'm encountering a lot of exceptions like these. The logic of the application did not change, and all the refactored operators are stateless, e.g simple map/flatmap/filter.

Does anyone know how to fix/avoid/work around this?

I'm using FsStateBackend on Flink 1.3.2

Regards,
Kien

java.lang.ClassNotFoundException:

x.x.X$$anon$113$$anon$55
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1826)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:305)
at org.apache.flink.streaming.api.operators.HeapInternalTimerService.restoreTimersForKeyGroup(HeapInternalTimerService.java:321)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.restoreStateForKeyGroup(InternalTimeServiceManager.java:141)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:496)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:104)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:251)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Reply | Threaded
Open this post in threaded view
|

Re: Exception for Scala anonymous class when restoring from state

Kien Truong
Hi,

Our state keys are all inline tuples so they get moved around during refactoring, I guess scala compile them to anonymous class. I will switched to named class, just in case.

Best regards,
Kien
On Aug 18, 2017, at 11:09, "Tzu-Li (Gordon) Tai" <[hidden email]> wrote:
Hi Kien,

Thank you for reporting this.
Generally, this is currently a restriction with the Scala API. Type serializers are generated anonymous classes when using the Scala API, and therefore is sensitive to the ordering / enclosing class / Scala version / etc. when generating them.
We’re discussing a fix for this at the moment, I’ll get back to you once we figure that out.

As for the case you’ve bumped into here, you seem to be failing on one of the key serializers when restoring the timer service.
Have you happened to also refactored classes that are used as keys? Maybe what you could try doing, as a means to avoid that for now, is to make sure that the key classes are untouched.

Please keep us updated on how this works out for you, I’ll continue to look into it.

Thanks,
Gordon

On 17 August 2017 at 10:46:05 AM, Kien Truong ([hidden email]) wrote:

Hi,

After some refactoring: moving some operator to separate functions/file, I'm encountering a lot of exceptions like these. The logic of the application did not change, and all the refactored operators are stateless, e.g simple map/flatmap/filter.

Does anyone know how to fix/avoid/work around this?

I'm using FsStateBackend on Flink 1.3.2

Regards,
Kien

java.lang.ClassNotFoundException:

x.x.X$$anon$113$$anon$55
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1826)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:305)
at org.apache.flink.streaming.api.operators.HeapInternalTimerService.restoreTimersForKeyGroup(HeapInternalTimerService.java:321)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.restoreStateForKeyGroup(InternalTimeServiceManager.java:141)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:496)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:104)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:251)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)