flink kryo exception

classic Classic list List threaded Threaded
17 messages Options
Reply | Threaded
Open this post in threaded view
|

flink kryo exception

yidan zhao
I have a job, the checkpoint and savepoint all right.
But, if I stop the job using 'stop -p', after the savepoint generated, then the job goes to fail. Here is the log:

2021-02-03 16:53:55,179 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - ual_ft_uid_subid_SidIncludeFilter -> ual_ft_uid_subid_Default
PassThroughFilter[null, null) -> ual_ft_uid_subid_UalUidFtExtractor -> ual_ft_uid_subid_EmptyUidFilter (17/30)#0 (46abce5d1148b56094726d442df2fd9c) switched
from RUNNING to FAILED.

com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
Caused by: java.lang.ClassNotFoundException: g^XT
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_251]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_251]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_251]
        at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Class.forName0(Native Method) ~[?:1.8.0_251]
        at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_251]
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        ... 22 more

Reply | Threaded
Open this post in threaded view
|

Re: flink kryo exception

Till Rohrmann
Hi,

could you show us the job you are trying to resume? Is it a SQL job or a DataStream job, for example?

From the stack trace, it looks as if the class g^XT is not on the class path.

Cheers,
Till

On Wed, Feb 3, 2021 at 10:30 AM 赵一旦 <[hidden email]> wrote:
I have a job, the checkpoint and savepoint all right.
But, if I stop the job using 'stop -p', after the savepoint generated, then the job goes to fail. Here is the log:

2021-02-03 16:53:55,179 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - ual_ft_uid_subid_SidIncludeFilter -> ual_ft_uid_subid_Default
PassThroughFilter[null, null) -> ual_ft_uid_subid_UalUidFtExtractor -> ual_ft_uid_subid_EmptyUidFilter (17/30)#0 (46abce5d1148b56094726d442df2fd9c) switched
from RUNNING to FAILED.

com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
Caused by: java.lang.ClassNotFoundException: g^XT
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_251]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_251]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_251]
        at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Class.forName0(Native Method) ~[?:1.8.0_251]
        at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_251]
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        ... 22 more

Reply | Threaded
Open this post in threaded view
|

Re: flink kryo exception

yidan zhao
Actually the exception is different every time I stop the job.
Such as:
(1) com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
The stack as I given above.

(2) java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
2021-02-03 18:37:24
java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
    at java.util.ArrayList.rangeCheck(ArrayList.java:657)
    at java.util.ArrayList.get(ArrayList.java:433)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

(3)  com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 96
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 96
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

...

Till Rohrmann <[hidden email]> 于2021年2月3日周三 下午6:28写道:
Hi,

could you show us the job you are trying to resume? Is it a SQL job or a DataStream job, for example?

From the stack trace, it looks as if the class g^XT is not on the class path.

Cheers,
Till

On Wed, Feb 3, 2021 at 10:30 AM 赵一旦 <[hidden email]> wrote:
I have a job, the checkpoint and savepoint all right.
But, if I stop the job using 'stop -p', after the savepoint generated, then the job goes to fail. Here is the log:

2021-02-03 16:53:55,179 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - ual_ft_uid_subid_SidIncludeFilter -> ual_ft_uid_subid_Default
PassThroughFilter[null, null) -> ual_ft_uid_subid_UalUidFtExtractor -> ual_ft_uid_subid_EmptyUidFilter (17/30)#0 (46abce5d1148b56094726d442df2fd9c) switched
from RUNNING to FAILED.

com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
Caused by: java.lang.ClassNotFoundException: g^XT
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_251]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_251]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_251]
        at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Class.forName0(Native Method) ~[?:1.8.0_251]
        at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_251]
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        ... 22 more

Reply | Threaded
Open this post in threaded view
|

Re: flink kryo exception

yidan zhao
Some facts are possibly related with these, since another job do not meet these expectations.
The problem job use a class which contains a field of Class MapRecord, and MapRecord is defined to extend HashMap so as to accept variable json data.

Class MapRecord:
@NoArgsConstructor
@Slf4j
public class MapRecord extends HashMap<Object, Object> implements Serializable {
@Override
public void setTimestamp(Long timestamp) {
put("timestamp", timestamp);
put("server_time", timestamp);
}

@Override
public Long getTimestamp() {
try {
Object ts = getOrDefault("timestamp", getOrDefault("server_time", 0L));
return ((Number) Optional.ofNullable(ts).orElse(0L)).longValue();
} catch (Exception e) {
log.error("Error, MapRecord's timestamp invalid.", e);
return 0L;
}
}
}
Class UserAccessLog:
public class UserAccessLog extends AbstractRecord<UserAccessLog> {
private MapRecord d; // I think this is related to the problem...
    ... ...
}

赵一旦 <[hidden email]> 于2021年2月3日周三 下午6:43写道:
Actually the exception is different every time I stop the job.
Such as:
(1) com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
The stack as I given above.

(2) java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
2021-02-03 18:37:24
java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
    at java.util.ArrayList.rangeCheck(ArrayList.java:657)
    at java.util.ArrayList.get(ArrayList.java:433)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

(3)  com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 96
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 96
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

...

Till Rohrmann <[hidden email]> 于2021年2月3日周三 下午6:28写道:
Hi,

could you show us the job you are trying to resume? Is it a SQL job or a DataStream job, for example?

From the stack trace, it looks as if the class g^XT is not on the class path.

Cheers,
Till

On Wed, Feb 3, 2021 at 10:30 AM 赵一旦 <[hidden email]> wrote:
I have a job, the checkpoint and savepoint all right.
But, if I stop the job using 'stop -p', after the savepoint generated, then the job goes to fail. Here is the log:

2021-02-03 16:53:55,179 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - ual_ft_uid_subid_SidIncludeFilter -> ual_ft_uid_subid_Default
PassThroughFilter[null, null) -> ual_ft_uid_subid_UalUidFtExtractor -> ual_ft_uid_subid_EmptyUidFilter (17/30)#0 (46abce5d1148b56094726d442df2fd9c) switched
from RUNNING to FAILED.

com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
Caused by: java.lang.ClassNotFoundException: g^XT
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_251]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_251]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_251]
        at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Class.forName0(Native Method) ~[?:1.8.0_251]
        at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_251]
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        ... 22 more

Reply | Threaded
Open this post in threaded view
|

Re: flink kryo exception

Till Rohrmann
From these snippets it is hard to tell what's going wrong. Could you maybe give us a minimal example with which to reproduce the problem? Alternatively, have you read through Flink's serializer documentation [1]? Have you tried to use a simple POJO instead of inheriting from a HashMap?

The stack trace looks as if the job fails deserializing some key of your MapRecord map.


Cheers,
Till

On Wed, Feb 3, 2021 at 11:49 AM 赵一旦 <[hidden email]> wrote:
Some facts are possibly related with these, since another job do not meet these expectations.
The problem job use a class which contains a field of Class MapRecord, and MapRecord is defined to extend HashMap so as to accept variable json data.

Class MapRecord:
@NoArgsConstructor
@Slf4j
public class MapRecord extends HashMap<Object, Object> implements Serializable {
@Override
public void setTimestamp(Long timestamp) {
put("timestamp", timestamp);
put("server_time", timestamp);
}

@Override
public Long getTimestamp() {
try {
Object ts = getOrDefault("timestamp", getOrDefault("server_time", 0L));
return ((Number) Optional.ofNullable(ts).orElse(0L)).longValue();
} catch (Exception e) {
log.error("Error, MapRecord's timestamp invalid.", e);
return 0L;
}
}
}
Class UserAccessLog:
public class UserAccessLog extends AbstractRecord<UserAccessLog> {
private MapRecord d; // I think this is related to the problem...
    ... ...
}

赵一旦 <[hidden email]> 于2021年2月3日周三 下午6:43写道:
Actually the exception is different every time I stop the job.
Such as:
(1) com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
The stack as I given above.

(2) java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
2021-02-03 18:37:24
java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
    at java.util.ArrayList.rangeCheck(ArrayList.java:657)
    at java.util.ArrayList.get(ArrayList.java:433)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

(3)  com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 96
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 96
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

...

Till Rohrmann <[hidden email]> 于2021年2月3日周三 下午6:28写道:
Hi,

could you show us the job you are trying to resume? Is it a SQL job or a DataStream job, for example?

From the stack trace, it looks as if the class g^XT is not on the class path.

Cheers,
Till

On Wed, Feb 3, 2021 at 10:30 AM 赵一旦 <[hidden email]> wrote:
I have a job, the checkpoint and savepoint all right.
But, if I stop the job using 'stop -p', after the savepoint generated, then the job goes to fail. Here is the log:

2021-02-03 16:53:55,179 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - ual_ft_uid_subid_SidIncludeFilter -> ual_ft_uid_subid_Default
PassThroughFilter[null, null) -> ual_ft_uid_subid_UalUidFtExtractor -> ual_ft_uid_subid_EmptyUidFilter (17/30)#0 (46abce5d1148b56094726d442df2fd9c) switched
from RUNNING to FAILED.

com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
Caused by: java.lang.ClassNotFoundException: g^XT
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_251]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_251]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_251]
        at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Class.forName0(Native Method) ~[?:1.8.0_251]
        at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_251]
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        ... 22 more

Reply | Threaded
Open this post in threaded view
|

Re: flink kryo exception

yidan zhao
Hi all, I find that the failure always occurred in the second task, after the source task. So I do something in the first chaining task, I transform the 'Map' based class object to another normal class object, and the problem disappeared.

Based on the new solution, I also tried to stop and restore job with savepoint (all successful).

But, I also met another problem. Also this problem occurs while I stop the job, and also occurs in the second task after the source task. The log is below:
2021-02-05 16:21:26
java.io.EOFException
    at org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:321)
    at org.apache.flink.types.StringValue.readString(StringValue.java:783)
    at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)
    at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:33)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

It is also about serialize and deserialize, but not related to kryo this time.


Till Rohrmann <[hidden email]> 于2021年2月3日周三 下午9:22写道:
From these snippets it is hard to tell what's going wrong. Could you maybe give us a minimal example with which to reproduce the problem? Alternatively, have you read through Flink's serializer documentation [1]? Have you tried to use a simple POJO instead of inheriting from a HashMap?

The stack trace looks as if the job fails deserializing some key of your MapRecord map.


Cheers,
Till

On Wed, Feb 3, 2021 at 11:49 AM 赵一旦 <[hidden email]> wrote:
Some facts are possibly related with these, since another job do not meet these expectations.
The problem job use a class which contains a field of Class MapRecord, and MapRecord is defined to extend HashMap so as to accept variable json data.

Class MapRecord:
@NoArgsConstructor
@Slf4j
public class MapRecord extends HashMap<Object, Object> implements Serializable {
@Override
public void setTimestamp(Long timestamp) {
put("timestamp", timestamp);
put("server_time", timestamp);
}

@Override
public Long getTimestamp() {
try {
Object ts = getOrDefault("timestamp", getOrDefault("server_time", 0L));
return ((Number) Optional.ofNullable(ts).orElse(0L)).longValue();
} catch (Exception e) {
log.error("Error, MapRecord's timestamp invalid.", e);
return 0L;
}
}
}
Class UserAccessLog:
public class UserAccessLog extends AbstractRecord<UserAccessLog> {
private MapRecord d; // I think this is related to the problem...
    ... ...
}

赵一旦 <[hidden email]> 于2021年2月3日周三 下午6:43写道:
Actually the exception is different every time I stop the job.
Such as:
(1) com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
The stack as I given above.

(2) java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
2021-02-03 18:37:24
java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
    at java.util.ArrayList.rangeCheck(ArrayList.java:657)
    at java.util.ArrayList.get(ArrayList.java:433)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

(3)  com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 96
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 96
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

...

Till Rohrmann <[hidden email]> 于2021年2月3日周三 下午6:28写道:
Hi,

could you show us the job you are trying to resume? Is it a SQL job or a DataStream job, for example?

From the stack trace, it looks as if the class g^XT is not on the class path.

Cheers,
Till

On Wed, Feb 3, 2021 at 10:30 AM 赵一旦 <[hidden email]> wrote:
I have a job, the checkpoint and savepoint all right.
But, if I stop the job using 'stop -p', after the savepoint generated, then the job goes to fail. Here is the log:

2021-02-03 16:53:55,179 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - ual_ft_uid_subid_SidIncludeFilter -> ual_ft_uid_subid_Default
PassThroughFilter[null, null) -> ual_ft_uid_subid_UalUidFtExtractor -> ual_ft_uid_subid_EmptyUidFilter (17/30)#0 (46abce5d1148b56094726d442df2fd9c) switched
from RUNNING to FAILED.

com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
Caused by: java.lang.ClassNotFoundException: g^XT
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_251]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_251]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_251]
        at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Class.forName0(Native Method) ~[?:1.8.0_251]
        at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_251]
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        ... 22 more

Reply | Threaded
Open this post in threaded view
|

Re: flink kryo exception

yidan zhao
I do not think this is some code related problem anymore, maybe it is some bug?

赵一旦 <[hidden email]> 于2021年2月5日周五 下午4:30写道:
Hi all, I find that the failure always occurred in the second task, after the source task. So I do something in the first chaining task, I transform the 'Map' based class object to another normal class object, and the problem disappeared.

Based on the new solution, I also tried to stop and restore job with savepoint (all successful).

But, I also met another problem. Also this problem occurs while I stop the job, and also occurs in the second task after the source task. The log is below:
2021-02-05 16:21:26
java.io.EOFException
    at org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:321)
    at org.apache.flink.types.StringValue.readString(StringValue.java:783)
    at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)
    at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:33)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

It is also about serialize and deserialize, but not related to kryo this time.


Till Rohrmann <[hidden email]> 于2021年2月3日周三 下午9:22写道:
From these snippets it is hard to tell what's going wrong. Could you maybe give us a minimal example with which to reproduce the problem? Alternatively, have you read through Flink's serializer documentation [1]? Have you tried to use a simple POJO instead of inheriting from a HashMap?

The stack trace looks as if the job fails deserializing some key of your MapRecord map.


Cheers,
Till

On Wed, Feb 3, 2021 at 11:49 AM 赵一旦 <[hidden email]> wrote:
Some facts are possibly related with these, since another job do not meet these expectations.
The problem job use a class which contains a field of Class MapRecord, and MapRecord is defined to extend HashMap so as to accept variable json data.

Class MapRecord:
@NoArgsConstructor
@Slf4j
public class MapRecord extends HashMap<Object, Object> implements Serializable {
@Override
public void setTimestamp(Long timestamp) {
put("timestamp", timestamp);
put("server_time", timestamp);
}

@Override
public Long getTimestamp() {
try {
Object ts = getOrDefault("timestamp", getOrDefault("server_time", 0L));
return ((Number) Optional.ofNullable(ts).orElse(0L)).longValue();
} catch (Exception e) {
log.error("Error, MapRecord's timestamp invalid.", e);
return 0L;
}
}
}
Class UserAccessLog:
public class UserAccessLog extends AbstractRecord<UserAccessLog> {
private MapRecord d; // I think this is related to the problem...
    ... ...
}

赵一旦 <[hidden email]> 于2021年2月3日周三 下午6:43写道:
Actually the exception is different every time I stop the job.
Such as:
(1) com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
The stack as I given above.

(2) java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
2021-02-03 18:37:24
java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
    at java.util.ArrayList.rangeCheck(ArrayList.java:657)
    at java.util.ArrayList.get(ArrayList.java:433)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

(3)  com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 96
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 96
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

...

Till Rohrmann <[hidden email]> 于2021年2月3日周三 下午6:28写道:
Hi,

could you show us the job you are trying to resume? Is it a SQL job or a DataStream job, for example?

From the stack trace, it looks as if the class g^XT is not on the class path.

Cheers,
Till

On Wed, Feb 3, 2021 at 10:30 AM 赵一旦 <[hidden email]> wrote:
I have a job, the checkpoint and savepoint all right.
But, if I stop the job using 'stop -p', after the savepoint generated, then the job goes to fail. Here is the log:

2021-02-03 16:53:55,179 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - ual_ft_uid_subid_SidIncludeFilter -> ual_ft_uid_subid_Default
PassThroughFilter[null, null) -> ual_ft_uid_subid_UalUidFtExtractor -> ual_ft_uid_subid_EmptyUidFilter (17/30)#0 (46abce5d1148b56094726d442df2fd9c) switched
from RUNNING to FAILED.

com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
Caused by: java.lang.ClassNotFoundException: g^XT
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_251]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_251]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_251]
        at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Class.forName0(Native Method) ~[?:1.8.0_251]
        at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_251]
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        ... 22 more

Reply | Threaded
Open this post in threaded view
|

Re: flink kryo exception

rmetzger0
Are you using unaligned checkpoints? (there's a known bug in 1.12.0 which can lead to corrupted data when using UC)
Can you tell us a little bit about your environment? (How are you deploying Flink, which state backend are you using, what kind of job (I guess DataStream API))

Somehow the process receiving the data is unable to deserialize it, most likely because they are configured differently (different classpath, dependency versions etc.)

On Fri, Feb 5, 2021 at 10:36 AM 赵一旦 <[hidden email]> wrote:
I do not think this is some code related problem anymore, maybe it is some bug?

赵一旦 <[hidden email]> 于2021年2月5日周五 下午4:30写道:
Hi all, I find that the failure always occurred in the second task, after the source task. So I do something in the first chaining task, I transform the 'Map' based class object to another normal class object, and the problem disappeared.

Based on the new solution, I also tried to stop and restore job with savepoint (all successful).

But, I also met another problem. Also this problem occurs while I stop the job, and also occurs in the second task after the source task. The log is below:
2021-02-05 16:21:26
java.io.EOFException
    at org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:321)
    at org.apache.flink.types.StringValue.readString(StringValue.java:783)
    at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)
    at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:33)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

It is also about serialize and deserialize, but not related to kryo this time.


Till Rohrmann <[hidden email]> 于2021年2月3日周三 下午9:22写道:
From these snippets it is hard to tell what's going wrong. Could you maybe give us a minimal example with which to reproduce the problem? Alternatively, have you read through Flink's serializer documentation [1]? Have you tried to use a simple POJO instead of inheriting from a HashMap?

The stack trace looks as if the job fails deserializing some key of your MapRecord map.


Cheers,
Till

On Wed, Feb 3, 2021 at 11:49 AM 赵一旦 <[hidden email]> wrote:
Some facts are possibly related with these, since another job do not meet these expectations.
The problem job use a class which contains a field of Class MapRecord, and MapRecord is defined to extend HashMap so as to accept variable json data.

Class MapRecord:
@NoArgsConstructor
@Slf4j
public class MapRecord extends HashMap<Object, Object> implements Serializable {
@Override
public void setTimestamp(Long timestamp) {
put("timestamp", timestamp);
put("server_time", timestamp);
}

@Override
public Long getTimestamp() {
try {
Object ts = getOrDefault("timestamp", getOrDefault("server_time", 0L));
return ((Number) Optional.ofNullable(ts).orElse(0L)).longValue();
} catch (Exception e) {
log.error("Error, MapRecord's timestamp invalid.", e);
return 0L;
}
}
}
Class UserAccessLog:
public class UserAccessLog extends AbstractRecord<UserAccessLog> {
private MapRecord d; // I think this is related to the problem...
    ... ...
}

赵一旦 <[hidden email]> 于2021年2月3日周三 下午6:43写道:
Actually the exception is different every time I stop the job.
Such as:
(1) com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
The stack as I given above.

(2) java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
2021-02-03 18:37:24
java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
    at java.util.ArrayList.rangeCheck(ArrayList.java:657)
    at java.util.ArrayList.get(ArrayList.java:433)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

(3)  com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 96
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 96
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

...

Till Rohrmann <[hidden email]> 于2021年2月3日周三 下午6:28写道:
Hi,

could you show us the job you are trying to resume? Is it a SQL job or a DataStream job, for example?

From the stack trace, it looks as if the class g^XT is not on the class path.

Cheers,
Till

On Wed, Feb 3, 2021 at 10:30 AM 赵一旦 <[hidden email]> wrote:
I have a job, the checkpoint and savepoint all right.
But, if I stop the job using 'stop -p', after the savepoint generated, then the job goes to fail. Here is the log:

2021-02-03 16:53:55,179 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - ual_ft_uid_subid_SidIncludeFilter -> ual_ft_uid_subid_Default
PassThroughFilter[null, null) -> ual_ft_uid_subid_UalUidFtExtractor -> ual_ft_uid_subid_EmptyUidFilter (17/30)#0 (46abce5d1148b56094726d442df2fd9c) switched
from RUNNING to FAILED.

com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
Caused by: java.lang.ClassNotFoundException: g^XT
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_251]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_251]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_251]
        at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Class.forName0(Native Method) ~[?:1.8.0_251]
        at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_251]
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        ... 22 more

Reply | Threaded
Open this post in threaded view
|

Re: flink kryo exception

yidan zhao
Flink1.12.0; only using aligned checkpoint; Standalone Cluster; 



Robert Metzger <[hidden email]> 于2021年2月5日周五 下午6:52写道:
Are you using unaligned checkpoints? (there's a known bug in 1.12.0 which can lead to corrupted data when using UC)
Can you tell us a little bit about your environment? (How are you deploying Flink, which state backend are you using, what kind of job (I guess DataStream API))

Somehow the process receiving the data is unable to deserialize it, most likely because they are configured differently (different classpath, dependency versions etc.)

On Fri, Feb 5, 2021 at 10:36 AM 赵一旦 <[hidden email]> wrote:
I do not think this is some code related problem anymore, maybe it is some bug?

赵一旦 <[hidden email]> 于2021年2月5日周五 下午4:30写道:
Hi all, I find that the failure always occurred in the second task, after the source task. So I do something in the first chaining task, I transform the 'Map' based class object to another normal class object, and the problem disappeared.

Based on the new solution, I also tried to stop and restore job with savepoint (all successful).

But, I also met another problem. Also this problem occurs while I stop the job, and also occurs in the second task after the source task. The log is below:
2021-02-05 16:21:26
java.io.EOFException
    at org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:321)
    at org.apache.flink.types.StringValue.readString(StringValue.java:783)
    at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)
    at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:33)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

It is also about serialize and deserialize, but not related to kryo this time.


Till Rohrmann <[hidden email]> 于2021年2月3日周三 下午9:22写道:
From these snippets it is hard to tell what's going wrong. Could you maybe give us a minimal example with which to reproduce the problem? Alternatively, have you read through Flink's serializer documentation [1]? Have you tried to use a simple POJO instead of inheriting from a HashMap?

The stack trace looks as if the job fails deserializing some key of your MapRecord map.


Cheers,
Till

On Wed, Feb 3, 2021 at 11:49 AM 赵一旦 <[hidden email]> wrote:
Some facts are possibly related with these, since another job do not meet these expectations.
The problem job use a class which contains a field of Class MapRecord, and MapRecord is defined to extend HashMap so as to accept variable json data.

Class MapRecord:
@NoArgsConstructor
@Slf4j
public class MapRecord extends HashMap<Object, Object> implements Serializable {
@Override
public void setTimestamp(Long timestamp) {
put("timestamp", timestamp);
put("server_time", timestamp);
}

@Override
public Long getTimestamp() {
try {
Object ts = getOrDefault("timestamp", getOrDefault("server_time", 0L));
return ((Number) Optional.ofNullable(ts).orElse(0L)).longValue();
} catch (Exception e) {
log.error("Error, MapRecord's timestamp invalid.", e);
return 0L;
}
}
}
Class UserAccessLog:
public class UserAccessLog extends AbstractRecord<UserAccessLog> {
private MapRecord d; // I think this is related to the problem...
    ... ...
}

赵一旦 <[hidden email]> 于2021年2月3日周三 下午6:43写道:
Actually the exception is different every time I stop the job.
Such as:
(1) com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
The stack as I given above.

(2) java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
2021-02-03 18:37:24
java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
    at java.util.ArrayList.rangeCheck(ArrayList.java:657)
    at java.util.ArrayList.get(ArrayList.java:433)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

(3)  com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 96
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 96
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

...

Till Rohrmann <[hidden email]> 于2021年2月3日周三 下午6:28写道:
Hi,

could you show us the job you are trying to resume? Is it a SQL job or a DataStream job, for example?

From the stack trace, it looks as if the class g^XT is not on the class path.

Cheers,
Till

On Wed, Feb 3, 2021 at 10:30 AM 赵一旦 <[hidden email]> wrote:
I have a job, the checkpoint and savepoint all right.
But, if I stop the job using 'stop -p', after the savepoint generated, then the job goes to fail. Here is the log:

2021-02-03 16:53:55,179 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - ual_ft_uid_subid_SidIncludeFilter -> ual_ft_uid_subid_Default
PassThroughFilter[null, null) -> ual_ft_uid_subid_UalUidFtExtractor -> ual_ft_uid_subid_EmptyUidFilter (17/30)#0 (46abce5d1148b56094726d442df2fd9c) switched
from RUNNING to FAILED.

com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
Caused by: java.lang.ClassNotFoundException: g^XT
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_251]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_251]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_251]
        at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Class.forName0(Native Method) ~[?:1.8.0_251]
        at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_251]
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        ... 22 more

Reply | Threaded
Open this post in threaded view
|

Re: flink kryo exception

rmetzger0
Are you 100% sure that the jar files in the classpath (/lib folder) are exactly the same on all machines? (It can happen quite easily in a distributed standalone setup that some files are different)


On Fri, Feb 5, 2021 at 12:00 PM 赵一旦 <[hidden email]> wrote:
Flink1.12.0; only using aligned checkpoint; Standalone Cluster; 



Robert Metzger <[hidden email]> 于2021年2月5日周五 下午6:52写道:
Are you using unaligned checkpoints? (there's a known bug in 1.12.0 which can lead to corrupted data when using UC)
Can you tell us a little bit about your environment? (How are you deploying Flink, which state backend are you using, what kind of job (I guess DataStream API))

Somehow the process receiving the data is unable to deserialize it, most likely because they are configured differently (different classpath, dependency versions etc.)

On Fri, Feb 5, 2021 at 10:36 AM 赵一旦 <[hidden email]> wrote:
I do not think this is some code related problem anymore, maybe it is some bug?

赵一旦 <[hidden email]> 于2021年2月5日周五 下午4:30写道:
Hi all, I find that the failure always occurred in the second task, after the source task. So I do something in the first chaining task, I transform the 'Map' based class object to another normal class object, and the problem disappeared.

Based on the new solution, I also tried to stop and restore job with savepoint (all successful).

But, I also met another problem. Also this problem occurs while I stop the job, and also occurs in the second task after the source task. The log is below:
2021-02-05 16:21:26
java.io.EOFException
    at org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:321)
    at org.apache.flink.types.StringValue.readString(StringValue.java:783)
    at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)
    at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:33)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

It is also about serialize and deserialize, but not related to kryo this time.


Till Rohrmann <[hidden email]> 于2021年2月3日周三 下午9:22写道:
From these snippets it is hard to tell what's going wrong. Could you maybe give us a minimal example with which to reproduce the problem? Alternatively, have you read through Flink's serializer documentation [1]? Have you tried to use a simple POJO instead of inheriting from a HashMap?

The stack trace looks as if the job fails deserializing some key of your MapRecord map.


Cheers,
Till

On Wed, Feb 3, 2021 at 11:49 AM 赵一旦 <[hidden email]> wrote:
Some facts are possibly related with these, since another job do not meet these expectations.
The problem job use a class which contains a field of Class MapRecord, and MapRecord is defined to extend HashMap so as to accept variable json data.

Class MapRecord:
@NoArgsConstructor
@Slf4j
public class MapRecord extends HashMap<Object, Object> implements Serializable {
@Override
public void setTimestamp(Long timestamp) {
put("timestamp", timestamp);
put("server_time", timestamp);
}

@Override
public Long getTimestamp() {
try {
Object ts = getOrDefault("timestamp", getOrDefault("server_time", 0L));
return ((Number) Optional.ofNullable(ts).orElse(0L)).longValue();
} catch (Exception e) {
log.error("Error, MapRecord's timestamp invalid.", e);
return 0L;
}
}
}
Class UserAccessLog:
public class UserAccessLog extends AbstractRecord<UserAccessLog> {
private MapRecord d; // I think this is related to the problem...
    ... ...
}

赵一旦 <[hidden email]> 于2021年2月3日周三 下午6:43写道:
Actually the exception is different every time I stop the job.
Such as:
(1) com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
The stack as I given above.

(2) java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
2021-02-03 18:37:24
java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
    at java.util.ArrayList.rangeCheck(ArrayList.java:657)
    at java.util.ArrayList.get(ArrayList.java:433)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

(3)  com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 96
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 96
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

...

Till Rohrmann <[hidden email]> 于2021年2月3日周三 下午6:28写道:
Hi,

could you show us the job you are trying to resume? Is it a SQL job or a DataStream job, for example?

From the stack trace, it looks as if the class g^XT is not on the class path.

Cheers,
Till

On Wed, Feb 3, 2021 at 10:30 AM 赵一旦 <[hidden email]> wrote:
I have a job, the checkpoint and savepoint all right.
But, if I stop the job using 'stop -p', after the savepoint generated, then the job goes to fail. Here is the log:

2021-02-03 16:53:55,179 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - ual_ft_uid_subid_SidIncludeFilter -> ual_ft_uid_subid_Default
PassThroughFilter[null, null) -> ual_ft_uid_subid_UalUidFtExtractor -> ual_ft_uid_subid_EmptyUidFilter (17/30)#0 (46abce5d1148b56094726d442df2fd9c) switched
from RUNNING to FAILED.

com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
Caused by: java.lang.ClassNotFoundException: g^XT
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_251]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_251]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_251]
        at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Class.forName0(Native Method) ~[?:1.8.0_251]
        at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_251]
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        ... 22 more

Reply | Threaded
Open this post in threaded view
|

Re: flink kryo exception

yidan zhao
Yeah, and if it is different, why my job runs normally.  The problem only occurres when I stop it. 

Robert Metzger <[hidden email]> 于2021年2月5日周五 下午7:08写道:
Are you 100% sure that the jar files in the classpath (/lib folder) are exactly the same on all machines? (It can happen quite easily in a distributed standalone setup that some files are different)


On Fri, Feb 5, 2021 at 12:00 PM 赵一旦 <[hidden email]> wrote:
Flink1.12.0; only using aligned checkpoint; Standalone Cluster; 



Robert Metzger <[hidden email]> 于2021年2月5日周五 下午6:52写道:
Are you using unaligned checkpoints? (there's a known bug in 1.12.0 which can lead to corrupted data when using UC)
Can you tell us a little bit about your environment? (How are you deploying Flink, which state backend are you using, what kind of job (I guess DataStream API))

Somehow the process receiving the data is unable to deserialize it, most likely because they are configured differently (different classpath, dependency versions etc.)

On Fri, Feb 5, 2021 at 10:36 AM 赵一旦 <[hidden email]> wrote:
I do not think this is some code related problem anymore, maybe it is some bug?

赵一旦 <[hidden email]> 于2021年2月5日周五 下午4:30写道:
Hi all, I find that the failure always occurred in the second task, after the source task. So I do something in the first chaining task, I transform the 'Map' based class object to another normal class object, and the problem disappeared.

Based on the new solution, I also tried to stop and restore job with savepoint (all successful).

But, I also met another problem. Also this problem occurs while I stop the job, and also occurs in the second task after the source task. The log is below:
2021-02-05 16:21:26
java.io.EOFException
    at org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:321)
    at org.apache.flink.types.StringValue.readString(StringValue.java:783)
    at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)
    at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:33)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

It is also about serialize and deserialize, but not related to kryo this time.


Till Rohrmann <[hidden email]> 于2021年2月3日周三 下午9:22写道:
From these snippets it is hard to tell what's going wrong. Could you maybe give us a minimal example with which to reproduce the problem? Alternatively, have you read through Flink's serializer documentation [1]? Have you tried to use a simple POJO instead of inheriting from a HashMap?

The stack trace looks as if the job fails deserializing some key of your MapRecord map.


Cheers,
Till

On Wed, Feb 3, 2021 at 11:49 AM 赵一旦 <[hidden email]> wrote:
Some facts are possibly related with these, since another job do not meet these expectations.
The problem job use a class which contains a field of Class MapRecord, and MapRecord is defined to extend HashMap so as to accept variable json data.

Class MapRecord:
@NoArgsConstructor
@Slf4j
public class MapRecord extends HashMap<Object, Object> implements Serializable {
@Override
public void setTimestamp(Long timestamp) {
put("timestamp", timestamp);
put("server_time", timestamp);
}

@Override
public Long getTimestamp() {
try {
Object ts = getOrDefault("timestamp", getOrDefault("server_time", 0L));
return ((Number) Optional.ofNullable(ts).orElse(0L)).longValue();
} catch (Exception e) {
log.error("Error, MapRecord's timestamp invalid.", e);
return 0L;
}
}
}
Class UserAccessLog:
public class UserAccessLog extends AbstractRecord<UserAccessLog> {
private MapRecord d; // I think this is related to the problem...
    ... ...
}

赵一旦 <[hidden email]> 于2021年2月3日周三 下午6:43写道:
Actually the exception is different every time I stop the job.
Such as:
(1) com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
The stack as I given above.

(2) java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
2021-02-03 18:37:24
java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
    at java.util.ArrayList.rangeCheck(ArrayList.java:657)
    at java.util.ArrayList.get(ArrayList.java:433)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

(3)  com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 96
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 96
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

...

Till Rohrmann <[hidden email]> 于2021年2月3日周三 下午6:28写道:
Hi,

could you show us the job you are trying to resume? Is it a SQL job or a DataStream job, for example?

From the stack trace, it looks as if the class g^XT is not on the class path.

Cheers,
Till

On Wed, Feb 3, 2021 at 10:30 AM 赵一旦 <[hidden email]> wrote:
I have a job, the checkpoint and savepoint all right.
But, if I stop the job using 'stop -p', after the savepoint generated, then the job goes to fail. Here is the log:

2021-02-03 16:53:55,179 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - ual_ft_uid_subid_SidIncludeFilter -> ual_ft_uid_subid_Default
PassThroughFilter[null, null) -> ual_ft_uid_subid_UalUidFtExtractor -> ual_ft_uid_subid_EmptyUidFilter (17/30)#0 (46abce5d1148b56094726d442df2fd9c) switched
from RUNNING to FAILED.

com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
Caused by: java.lang.ClassNotFoundException: g^XT
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_251]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_251]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_251]
        at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Class.forName0(Native Method) ~[?:1.8.0_251]
        at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_251]
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        ... 22 more

Reply | Threaded
Open this post in threaded view
|

Re: flink kryo exception

Till Rohrmann
Could you provide us with a minimal working example which reproduces the problem for you? This would be super helpful in figuring out the problem you are experiencing. Thanks a lot for your help.

Cheers,
Till

On Fri, Feb 5, 2021 at 1:03 PM 赵一旦 <[hidden email]> wrote:
Yeah, and if it is different, why my job runs normally.  The problem only occurres when I stop it. 

Robert Metzger <[hidden email]> 于2021年2月5日周五 下午7:08写道:
Are you 100% sure that the jar files in the classpath (/lib folder) are exactly the same on all machines? (It can happen quite easily in a distributed standalone setup that some files are different)


On Fri, Feb 5, 2021 at 12:00 PM 赵一旦 <[hidden email]> wrote:
Flink1.12.0; only using aligned checkpoint; Standalone Cluster; 



Robert Metzger <[hidden email]> 于2021年2月5日周五 下午6:52写道:
Are you using unaligned checkpoints? (there's a known bug in 1.12.0 which can lead to corrupted data when using UC)
Can you tell us a little bit about your environment? (How are you deploying Flink, which state backend are you using, what kind of job (I guess DataStream API))

Somehow the process receiving the data is unable to deserialize it, most likely because they are configured differently (different classpath, dependency versions etc.)

On Fri, Feb 5, 2021 at 10:36 AM 赵一旦 <[hidden email]> wrote:
I do not think this is some code related problem anymore, maybe it is some bug?

赵一旦 <[hidden email]> 于2021年2月5日周五 下午4:30写道:
Hi all, I find that the failure always occurred in the second task, after the source task. So I do something in the first chaining task, I transform the 'Map' based class object to another normal class object, and the problem disappeared.

Based on the new solution, I also tried to stop and restore job with savepoint (all successful).

But, I also met another problem. Also this problem occurs while I stop the job, and also occurs in the second task after the source task. The log is below:
2021-02-05 16:21:26
java.io.EOFException
    at org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:321)
    at org.apache.flink.types.StringValue.readString(StringValue.java:783)
    at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)
    at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:33)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

It is also about serialize and deserialize, but not related to kryo this time.


Till Rohrmann <[hidden email]> 于2021年2月3日周三 下午9:22写道:
From these snippets it is hard to tell what's going wrong. Could you maybe give us a minimal example with which to reproduce the problem? Alternatively, have you read through Flink's serializer documentation [1]? Have you tried to use a simple POJO instead of inheriting from a HashMap?

The stack trace looks as if the job fails deserializing some key of your MapRecord map.


Cheers,
Till

On Wed, Feb 3, 2021 at 11:49 AM 赵一旦 <[hidden email]> wrote:
Some facts are possibly related with these, since another job do not meet these expectations.
The problem job use a class which contains a field of Class MapRecord, and MapRecord is defined to extend HashMap so as to accept variable json data.

Class MapRecord:
@NoArgsConstructor
@Slf4j
public class MapRecord extends HashMap<Object, Object> implements Serializable {
@Override
public void setTimestamp(Long timestamp) {
put("timestamp", timestamp);
put("server_time", timestamp);
}

@Override
public Long getTimestamp() {
try {
Object ts = getOrDefault("timestamp", getOrDefault("server_time", 0L));
return ((Number) Optional.ofNullable(ts).orElse(0L)).longValue();
} catch (Exception e) {
log.error("Error, MapRecord's timestamp invalid.", e);
return 0L;
}
}
}
Class UserAccessLog:
public class UserAccessLog extends AbstractRecord<UserAccessLog> {
private MapRecord d; // I think this is related to the problem...
    ... ...
}

赵一旦 <[hidden email]> 于2021年2月3日周三 下午6:43写道:
Actually the exception is different every time I stop the job.
Such as:
(1) com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
The stack as I given above.

(2) java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
2021-02-03 18:37:24
java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
    at java.util.ArrayList.rangeCheck(ArrayList.java:657)
    at java.util.ArrayList.get(ArrayList.java:433)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

(3)  com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 96
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 96
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

...

Till Rohrmann <[hidden email]> 于2021年2月3日周三 下午6:28写道:
Hi,

could you show us the job you are trying to resume? Is it a SQL job or a DataStream job, for example?

From the stack trace, it looks as if the class g^XT is not on the class path.

Cheers,
Till

On Wed, Feb 3, 2021 at 10:30 AM 赵一旦 <[hidden email]> wrote:
I have a job, the checkpoint and savepoint all right.
But, if I stop the job using 'stop -p', after the savepoint generated, then the job goes to fail. Here is the log:

2021-02-03 16:53:55,179 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - ual_ft_uid_subid_SidIncludeFilter -> ual_ft_uid_subid_Default
PassThroughFilter[null, null) -> ual_ft_uid_subid_UalUidFtExtractor -> ual_ft_uid_subid_EmptyUidFilter (17/30)#0 (46abce5d1148b56094726d442df2fd9c) switched
from RUNNING to FAILED.

com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
Caused by: java.lang.ClassNotFoundException: g^XT
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_251]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_251]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_251]
        at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Class.forName0(Native Method) ~[?:1.8.0_251]
        at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_251]
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        ... 22 more

Reply | Threaded
Open this post in threaded view
|

Re: flink kryo exception

yidan zhao
The first problem is critical, since the savepoint do not work.
The second problem, in which I changed the solution, removed the 'Map' based implementation before the data are transformed to the second task, and this case savepoint works.  The only problem is that, I should stop the job and remember the savepoint path, then restart job with the savepoint path. And now it is : I stop the job, then the job failed and restart automatically with the generated savepoint.  So I do not need to restart the job anymore, since what it does automatically is what I want to do.

I have some idea that maybe it is also related to the data? So I am not sure that I can provide an example to reproduces the problem.  

Till Rohrmann <[hidden email]> 于2021年2月6日周六 上午12:13写道:
Could you provide us with a minimal working example which reproduces the problem for you? This would be super helpful in figuring out the problem you are experiencing. Thanks a lot for your help.

Cheers,
Till

On Fri, Feb 5, 2021 at 1:03 PM 赵一旦 <[hidden email]> wrote:
Yeah, and if it is different, why my job runs normally.  The problem only occurres when I stop it. 

Robert Metzger <[hidden email]> 于2021年2月5日周五 下午7:08写道:
Are you 100% sure that the jar files in the classpath (/lib folder) are exactly the same on all machines? (It can happen quite easily in a distributed standalone setup that some files are different)


On Fri, Feb 5, 2021 at 12:00 PM 赵一旦 <[hidden email]> wrote:
Flink1.12.0; only using aligned checkpoint; Standalone Cluster; 



Robert Metzger <[hidden email]> 于2021年2月5日周五 下午6:52写道:
Are you using unaligned checkpoints? (there's a known bug in 1.12.0 which can lead to corrupted data when using UC)
Can you tell us a little bit about your environment? (How are you deploying Flink, which state backend are you using, what kind of job (I guess DataStream API))

Somehow the process receiving the data is unable to deserialize it, most likely because they are configured differently (different classpath, dependency versions etc.)

On Fri, Feb 5, 2021 at 10:36 AM 赵一旦 <[hidden email]> wrote:
I do not think this is some code related problem anymore, maybe it is some bug?

赵一旦 <[hidden email]> 于2021年2月5日周五 下午4:30写道:
Hi all, I find that the failure always occurred in the second task, after the source task. So I do something in the first chaining task, I transform the 'Map' based class object to another normal class object, and the problem disappeared.

Based on the new solution, I also tried to stop and restore job with savepoint (all successful).

But, I also met another problem. Also this problem occurs while I stop the job, and also occurs in the second task after the source task. The log is below:
2021-02-05 16:21:26
java.io.EOFException
    at org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:321)
    at org.apache.flink.types.StringValue.readString(StringValue.java:783)
    at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)
    at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:33)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

It is also about serialize and deserialize, but not related to kryo this time.


Till Rohrmann <[hidden email]> 于2021年2月3日周三 下午9:22写道:
From these snippets it is hard to tell what's going wrong. Could you maybe give us a minimal example with which to reproduce the problem? Alternatively, have you read through Flink's serializer documentation [1]? Have you tried to use a simple POJO instead of inheriting from a HashMap?

The stack trace looks as if the job fails deserializing some key of your MapRecord map.


Cheers,
Till

On Wed, Feb 3, 2021 at 11:49 AM 赵一旦 <[hidden email]> wrote:
Some facts are possibly related with these, since another job do not meet these expectations.
The problem job use a class which contains a field of Class MapRecord, and MapRecord is defined to extend HashMap so as to accept variable json data.

Class MapRecord:
@NoArgsConstructor
@Slf4j
public class MapRecord extends HashMap<Object, Object> implements Serializable {
@Override
public void setTimestamp(Long timestamp) {
put("timestamp", timestamp);
put("server_time", timestamp);
}

@Override
public Long getTimestamp() {
try {
Object ts = getOrDefault("timestamp", getOrDefault("server_time", 0L));
return ((Number) Optional.ofNullable(ts).orElse(0L)).longValue();
} catch (Exception e) {
log.error("Error, MapRecord's timestamp invalid.", e);
return 0L;
}
}
}
Class UserAccessLog:
public class UserAccessLog extends AbstractRecord<UserAccessLog> {
private MapRecord d; // I think this is related to the problem...
    ... ...
}

赵一旦 <[hidden email]> 于2021年2月3日周三 下午6:43写道:
Actually the exception is different every time I stop the job.
Such as:
(1) com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
The stack as I given above.

(2) java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
2021-02-03 18:37:24
java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
    at java.util.ArrayList.rangeCheck(ArrayList.java:657)
    at java.util.ArrayList.get(ArrayList.java:433)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

(3)  com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 96
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 96
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

...

Till Rohrmann <[hidden email]> 于2021年2月3日周三 下午6:28写道:
Hi,

could you show us the job you are trying to resume? Is it a SQL job or a DataStream job, for example?

From the stack trace, it looks as if the class g^XT is not on the class path.

Cheers,
Till

On Wed, Feb 3, 2021 at 10:30 AM 赵一旦 <[hidden email]> wrote:
I have a job, the checkpoint and savepoint all right.
But, if I stop the job using 'stop -p', after the savepoint generated, then the job goes to fail. Here is the log:

2021-02-03 16:53:55,179 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - ual_ft_uid_subid_SidIncludeFilter -> ual_ft_uid_subid_Default
PassThroughFilter[null, null) -> ual_ft_uid_subid_UalUidFtExtractor -> ual_ft_uid_subid_EmptyUidFilter (17/30)#0 (46abce5d1148b56094726d442df2fd9c) switched
from RUNNING to FAILED.

com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
Caused by: java.lang.ClassNotFoundException: g^XT
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_251]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_251]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_251]
        at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Class.forName0(Native Method) ~[?:1.8.0_251]
        at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_251]
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        ... 22 more

Reply | Threaded
Open this post in threaded view
|

Re: flink kryo exception

yidan zhao
It also maybe have something to do with my job's first tasks. The second task have two input, one is the kafka source stream(A), another is self-defined mysql source as broadcast stream.(B) 
In A: I have a 'WatermarkReAssigner', a self-defined operator which add an offset to its input watermark and then forward to downstream.
In B: The parallelism is 30, but in my rich function's implementation, only the subtask-0 will do mysql query and send out records, other subtasks do nothing. All subtasks will send max_watermark - 86400_000 as the watermark.
Since both the first task have some self-defined source or implementation, I do not know whether the problem have something to do with it.

赵一旦 <[hidden email]> 于2021年2月7日周日 下午4:05写道:
The first problem is critical, since the savepoint do not work.
The second problem, in which I changed the solution, removed the 'Map' based implementation before the data are transformed to the second task, and this case savepoint works.  The only problem is that, I should stop the job and remember the savepoint path, then restart job with the savepoint path. And now it is : I stop the job, then the job failed and restart automatically with the generated savepoint.  So I do not need to restart the job anymore, since what it does automatically is what I want to do.

I have some idea that maybe it is also related to the data? So I am not sure that I can provide an example to reproduces the problem.  

Till Rohrmann <[hidden email]> 于2021年2月6日周六 上午12:13写道:
Could you provide us with a minimal working example which reproduces the problem for you? This would be super helpful in figuring out the problem you are experiencing. Thanks a lot for your help.

Cheers,
Till

On Fri, Feb 5, 2021 at 1:03 PM 赵一旦 <[hidden email]> wrote:
Yeah, and if it is different, why my job runs normally.  The problem only occurres when I stop it. 

Robert Metzger <[hidden email]> 于2021年2月5日周五 下午7:08写道:
Are you 100% sure that the jar files in the classpath (/lib folder) are exactly the same on all machines? (It can happen quite easily in a distributed standalone setup that some files are different)


On Fri, Feb 5, 2021 at 12:00 PM 赵一旦 <[hidden email]> wrote:
Flink1.12.0; only using aligned checkpoint; Standalone Cluster; 



Robert Metzger <[hidden email]> 于2021年2月5日周五 下午6:52写道:
Are you using unaligned checkpoints? (there's a known bug in 1.12.0 which can lead to corrupted data when using UC)
Can you tell us a little bit about your environment? (How are you deploying Flink, which state backend are you using, what kind of job (I guess DataStream API))

Somehow the process receiving the data is unable to deserialize it, most likely because they are configured differently (different classpath, dependency versions etc.)

On Fri, Feb 5, 2021 at 10:36 AM 赵一旦 <[hidden email]> wrote:
I do not think this is some code related problem anymore, maybe it is some bug?

赵一旦 <[hidden email]> 于2021年2月5日周五 下午4:30写道:
Hi all, I find that the failure always occurred in the second task, after the source task. So I do something in the first chaining task, I transform the 'Map' based class object to another normal class object, and the problem disappeared.

Based on the new solution, I also tried to stop and restore job with savepoint (all successful).

But, I also met another problem. Also this problem occurs while I stop the job, and also occurs in the second task after the source task. The log is below:
2021-02-05 16:21:26
java.io.EOFException
    at org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:321)
    at org.apache.flink.types.StringValue.readString(StringValue.java:783)
    at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)
    at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:33)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

It is also about serialize and deserialize, but not related to kryo this time.


Till Rohrmann <[hidden email]> 于2021年2月3日周三 下午9:22写道:
From these snippets it is hard to tell what's going wrong. Could you maybe give us a minimal example with which to reproduce the problem? Alternatively, have you read through Flink's serializer documentation [1]? Have you tried to use a simple POJO instead of inheriting from a HashMap?

The stack trace looks as if the job fails deserializing some key of your MapRecord map.


Cheers,
Till

On Wed, Feb 3, 2021 at 11:49 AM 赵一旦 <[hidden email]> wrote:
Some facts are possibly related with these, since another job do not meet these expectations.
The problem job use a class which contains a field of Class MapRecord, and MapRecord is defined to extend HashMap so as to accept variable json data.

Class MapRecord:
@NoArgsConstructor
@Slf4j
public class MapRecord extends HashMap<Object, Object> implements Serializable {
@Override
public void setTimestamp(Long timestamp) {
put("timestamp", timestamp);
put("server_time", timestamp);
}

@Override
public Long getTimestamp() {
try {
Object ts = getOrDefault("timestamp", getOrDefault("server_time", 0L));
return ((Number) Optional.ofNullable(ts).orElse(0L)).longValue();
} catch (Exception e) {
log.error("Error, MapRecord's timestamp invalid.", e);
return 0L;
}
}
}
Class UserAccessLog:
public class UserAccessLog extends AbstractRecord<UserAccessLog> {
private MapRecord d; // I think this is related to the problem...
    ... ...
}

赵一旦 <[hidden email]> 于2021年2月3日周三 下午6:43写道:
Actually the exception is different every time I stop the job.
Such as:
(1) com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
The stack as I given above.

(2) java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
2021-02-03 18:37:24
java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
    at java.util.ArrayList.rangeCheck(ArrayList.java:657)
    at java.util.ArrayList.get(ArrayList.java:433)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

(3)  com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 96
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 96
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

...

Till Rohrmann <[hidden email]> 于2021年2月3日周三 下午6:28写道:
Hi,

could you show us the job you are trying to resume? Is it a SQL job or a DataStream job, for example?

From the stack trace, it looks as if the class g^XT is not on the class path.

Cheers,
Till

On Wed, Feb 3, 2021 at 10:30 AM 赵一旦 <[hidden email]> wrote:
I have a job, the checkpoint and savepoint all right.
But, if I stop the job using 'stop -p', after the savepoint generated, then the job goes to fail. Here is the log:

2021-02-03 16:53:55,179 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - ual_ft_uid_subid_SidIncludeFilter -> ual_ft_uid_subid_Default
PassThroughFilter[null, null) -> ual_ft_uid_subid_UalUidFtExtractor -> ual_ft_uid_subid_EmptyUidFilter (17/30)#0 (46abce5d1148b56094726d442df2fd9c) switched
from RUNNING to FAILED.

com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
Caused by: java.lang.ClassNotFoundException: g^XT
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_251]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_251]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_251]
        at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Class.forName0(Native Method) ~[?:1.8.0_251]
        at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_251]
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        ... 22 more

Reply | Threaded
Open this post in threaded view
|

Re: Re: flink kryo exception

Yun Gao
Hi yidan,

One more thing to confirm: are you create the savepoint and stop the job all together with 

 bin/flink cancel -s [:targetDirectory] :jobId
command ?

Best,
 Yun


------------------Original Mail ------------------
Sender:赵一旦 <[hidden email]>
Send Date:Sun Feb 7 16:13:57 2021
Recipients:Till Rohrmann <[hidden email]>
CC:Robert Metzger <[hidden email]>, user <[hidden email]>
Subject:Re: flink kryo exception
It also maybe have something to do with my job's first tasks. The second task have two input, one is the kafka source stream(A), another is self-defined mysql source as broadcast stream.(B) 
In A: I have a 'WatermarkReAssigner', a self-defined operator which add an offset to its input watermark and then forward to downstream.
In B: The parallelism is 30, but in my rich function's implementation, only the subtask-0 will do mysql query and send out records, other subtasks do nothing. All subtasks will send max_watermark - 86400_000 as the watermark.
Since both the first task have some self-defined source or implementation, I do not know whether the problem have something to do with it.

赵一旦 <[hidden email]> 于2021年2月7日周日 下午4:05写道:
The first problem is critical, since the savepoint do not work.
The second problem, in which I changed the solution, removed the 'Map' based implementation before the data are transformed to the second task, and this case savepoint works.  The only problem is that, I should stop the job and remember the savepoint path, then restart job with the savepoint path. And now it is : I stop the job, then the job failed and restart automatically with the generated savepoint.  So I do not need to restart the job anymore, since what it does automatically is what I want to do.

I have some idea that maybe it is also related to the data? So I am not sure that I can provide an example to reproduces the problem.  

Till Rohrmann <[hidden email]> 于2021年2月6日周六 上午12:13写道:
Could you provide us with a minimal working example which reproduces the problem for you? This would be super helpful in figuring out the problem you are experiencing. Thanks a lot for your help.

Cheers,
Till

On Fri, Feb 5, 2021 at 1:03 PM 赵一旦 <[hidden email]> wrote:
Yeah, and if it is different, why my job runs normally.  The problem only occurres when I stop it. 

Robert Metzger <[hidden email]> 于2021年2月5日周五 下午7:08写道:
Are you 100% sure that the jar files in the classpath (/lib folder) are exactly the same on all machines? (It can happen quite easily in a distributed standalone setup that some files are different)


On Fri, Feb 5, 2021 at 12:00 PM 赵一旦 <[hidden email]> wrote:
Flink1.12.0; only using aligned checkpoint; Standalone Cluster; 



Robert Metzger <[hidden email]> 于2021年2月5日周五 下午6:52写道:
Are you using unaligned checkpoints? (there's a known bug in 1.12.0 which can lead to corrupted data when using UC)
Can you tell us a little bit about your environment? (How are you deploying Flink, which state backend are you using, what kind of job (I guess DataStream API))

Somehow the process receiving the data is unable to deserialize it, most likely because they are configured differently (different classpath, dependency versions etc.)

On Fri, Feb 5, 2021 at 10:36 AM 赵一旦 <[hidden email]> wrote:
I do not think this is some code related problem anymore, maybe it is some bug?

赵一旦 <[hidden email]> 于2021年2月5日周五 下午4:30写道:
Hi all, I find that the failure always occurred in the second task, after the source task. So I do something in the first chaining task, I transform the 'Map' based class object to another normal class object, and the problem disappeared.

Based on the new solution, I also tried to stop and restore job with savepoint (all successful).

But, I also met another problem. Also this problem occurs while I stop the job, and also occurs in the second task after the source task. The log is below:
2021-02-05 16:21:26
java.io.EOFException
    at org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:321)
    at org.apache.flink.types.StringValue.readString(StringValue.java:783)
    at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)
    at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:33)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

It is also about serialize and deserialize, but not related to kryo this time.


Till Rohrmann <[hidden email]> 于2021年2月3日周三 下午9:22写道:
From these snippets it is hard to tell what's going wrong. Could you maybe give us a minimal example with which to reproduce the problem? Alternatively, have you read through Flink's serializer documentation [1]? Have you tried to use a simple POJO instead of inheriting from a HashMap?

The stack trace looks as if the job fails deserializing some key of your MapRecord map.


Cheers,
Till

On Wed, Feb 3, 2021 at 11:49 AM 赵一旦 <[hidden email]> wrote:
Some facts are possibly related with these, since another job do not meet these expectations.
The problem job use a class which contains a field of Class MapRecord, and MapRecord is defined to extend HashMap so as to accept variable json data.

Class MapRecord:
@NoArgsConstructor
@Slf4j
public class MapRecord extends HashMap<Object, Object> implements Serializable {
@Override
public void setTimestamp(Long timestamp) {
put("timestamp", timestamp);
put("server_time", timestamp);
}

@Override
public Long getTimestamp() {
try {
Object ts = getOrDefault("timestamp", getOrDefault("server_time", 0L));
return ((Number) Optional.ofNullable(ts).orElse(0L)).longValue();
} catch (Exception e) {
log.error("Error, MapRecord's timestamp invalid.", e);
return 0L;
}
}
}
Class UserAccessLog:
public class UserAccessLog extends AbstractRecord<UserAccessLog> {
private MapRecord d; // I think this is related to the problem...
    ... ...
}

赵一旦 <[hidden email]> 于2021年2月3日周三 下午6:43写道:
Actually the exception is different every time I stop the job.
Such as:
(1) com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
The stack as I given above.

(2) java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
2021-02-03 18:37:24
java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
    at java.util.ArrayList.rangeCheck(ArrayList.java:657)
    at java.util.ArrayList.get(ArrayList.java:433)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

(3)  com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 96
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 96
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

...

Till Rohrmann <[hidden email]> 于2021年2月3日周三 下午6:28写道:
Hi,

could you show us the job you are trying to resume? Is it a SQL job or a DataStream job, for example?

From the stack trace, it looks as if the class g^XT is not on the class path.

Cheers,
Till

On Wed, Feb 3, 2021 at 10:30 AM 赵一旦 <[hidden email]> wrote:
I have a job, the checkpoint and savepoint all right.
But, if I stop the job using 'stop -p', after the savepoint generated, then the job goes to fail. Here is the log:

2021-02-03 16:53:55,179 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - ual_ft_uid_subid_SidIncludeFilter -> ual_ft_uid_subid_Default
PassThroughFilter[null, null) -> ual_ft_uid_subid_UalUidFtExtractor -> ual_ft_uid_subid_EmptyUidFilter (17/30)#0 (46abce5d1148b56094726d442df2fd9c) switched
from RUNNING to FAILED.

com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
Caused by: java.lang.ClassNotFoundException: g^XT
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_251]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_251]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_251]
        at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Class.forName0(Native Method) ~[?:1.8.0_251]
        at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_251]
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        ... 22 more

Reply | Threaded
Open this post in threaded view
|

Re: Re: flink kryo exception

yidan zhao
yes, but I use stop not cancel, which also stop and cancel the job together.

Yun Gao <[hidden email]> 于2021年2月8日周一 上午11:59写道:
Hi yidan,

One more thing to confirm: are you create the savepoint and stop the job all together with 

 bin/flink cancel -s [:targetDirectory] :jobId
command ?

Best,
 Yun


------------------Original Mail ------------------
Sender:赵一旦 <[hidden email]>
Send Date:Sun Feb 7 16:13:57 2021
Recipients:Till Rohrmann <[hidden email]>
CC:Robert Metzger <[hidden email]>, user <[hidden email]>
Subject:Re: flink kryo exception
It also maybe have something to do with my job's first tasks. The second task have two input, one is the kafka source stream(A), another is self-defined mysql source as broadcast stream.(B) 
In A: I have a 'WatermarkReAssigner', a self-defined operator which add an offset to its input watermark and then forward to downstream.
In B: The parallelism is 30, but in my rich function's implementation, only the subtask-0 will do mysql query and send out records, other subtasks do nothing. All subtasks will send max_watermark - 86400_000 as the watermark.
Since both the first task have some self-defined source or implementation, I do not know whether the problem have something to do with it.

赵一旦 <[hidden email]> 于2021年2月7日周日 下午4:05写道:
The first problem is critical, since the savepoint do not work.
The second problem, in which I changed the solution, removed the 'Map' based implementation before the data are transformed to the second task, and this case savepoint works.  The only problem is that, I should stop the job and remember the savepoint path, then restart job with the savepoint path. And now it is : I stop the job, then the job failed and restart automatically with the generated savepoint.  So I do not need to restart the job anymore, since what it does automatically is what I want to do.

I have some idea that maybe it is also related to the data? So I am not sure that I can provide an example to reproduces the problem.  

Till Rohrmann <[hidden email]> 于2021年2月6日周六 上午12:13写道:
Could you provide us with a minimal working example which reproduces the problem for you? This would be super helpful in figuring out the problem you are experiencing. Thanks a lot for your help.

Cheers,
Till

On Fri, Feb 5, 2021 at 1:03 PM 赵一旦 <[hidden email]> wrote:
Yeah, and if it is different, why my job runs normally.  The problem only occurres when I stop it. 

Robert Metzger <[hidden email]> 于2021年2月5日周五 下午7:08写道:
Are you 100% sure that the jar files in the classpath (/lib folder) are exactly the same on all machines? (It can happen quite easily in a distributed standalone setup that some files are different)


On Fri, Feb 5, 2021 at 12:00 PM 赵一旦 <[hidden email]> wrote:
Flink1.12.0; only using aligned checkpoint; Standalone Cluster; 



Robert Metzger <[hidden email]> 于2021年2月5日周五 下午6:52写道:
Are you using unaligned checkpoints? (there's a known bug in 1.12.0 which can lead to corrupted data when using UC)
Can you tell us a little bit about your environment? (How are you deploying Flink, which state backend are you using, what kind of job (I guess DataStream API))

Somehow the process receiving the data is unable to deserialize it, most likely because they are configured differently (different classpath, dependency versions etc.)

On Fri, Feb 5, 2021 at 10:36 AM 赵一旦 <[hidden email]> wrote:
I do not think this is some code related problem anymore, maybe it is some bug?

赵一旦 <[hidden email]> 于2021年2月5日周五 下午4:30写道:
Hi all, I find that the failure always occurred in the second task, after the source task. So I do something in the first chaining task, I transform the 'Map' based class object to another normal class object, and the problem disappeared.

Based on the new solution, I also tried to stop and restore job with savepoint (all successful).

But, I also met another problem. Also this problem occurs while I stop the job, and also occurs in the second task after the source task. The log is below:
2021-02-05 16:21:26
java.io.EOFException
    at org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:321)
    at org.apache.flink.types.StringValue.readString(StringValue.java:783)
    at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)
    at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:33)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

It is also about serialize and deserialize, but not related to kryo this time.


Till Rohrmann <[hidden email]> 于2021年2月3日周三 下午9:22写道:
From these snippets it is hard to tell what's going wrong. Could you maybe give us a minimal example with which to reproduce the problem? Alternatively, have you read through Flink's serializer documentation [1]? Have you tried to use a simple POJO instead of inheriting from a HashMap?

The stack trace looks as if the job fails deserializing some key of your MapRecord map.


Cheers,
Till

On Wed, Feb 3, 2021 at 11:49 AM 赵一旦 <[hidden email]> wrote:
Some facts are possibly related with these, since another job do not meet these expectations.
The problem job use a class which contains a field of Class MapRecord, and MapRecord is defined to extend HashMap so as to accept variable json data.

Class MapRecord:
@NoArgsConstructor
@Slf4j
public class MapRecord extends HashMap<Object, Object> implements Serializable {
@Override
public void setTimestamp(Long timestamp) {
put("timestamp", timestamp);
put("server_time", timestamp);
}

@Override
public Long getTimestamp() {
try {
Object ts = getOrDefault("timestamp", getOrDefault("server_time", 0L));
return ((Number) Optional.ofNullable(ts).orElse(0L)).longValue();
} catch (Exception e) {
log.error("Error, MapRecord's timestamp invalid.", e);
return 0L;
}
}
}
Class UserAccessLog:
public class UserAccessLog extends AbstractRecord<UserAccessLog> {
private MapRecord d; // I think this is related to the problem...
    ... ...
}

赵一旦 <[hidden email]> 于2021年2月3日周三 下午6:43写道:
Actually the exception is different every time I stop the job.
Such as:
(1) com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
The stack as I given above.

(2) java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
2021-02-03 18:37:24
java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
    at java.util.ArrayList.rangeCheck(ArrayList.java:657)
    at java.util.ArrayList.get(ArrayList.java:433)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

(3)  com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 96
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 96
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

...

Till Rohrmann <[hidden email]> 于2021年2月3日周三 下午6:28写道:
Hi,

could you show us the job you are trying to resume? Is it a SQL job or a DataStream job, for example?

From the stack trace, it looks as if the class g^XT is not on the class path.

Cheers,
Till

On Wed, Feb 3, 2021 at 10:30 AM 赵一旦 <[hidden email]> wrote:
I have a job, the checkpoint and savepoint all right.
But, if I stop the job using 'stop -p', after the savepoint generated, then the job goes to fail. Here is the log:

2021-02-03 16:53:55,179 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - ual_ft_uid_subid_SidIncludeFilter -> ual_ft_uid_subid_Default
PassThroughFilter[null, null) -> ual_ft_uid_subid_UalUidFtExtractor -> ual_ft_uid_subid_EmptyUidFilter (17/30)#0 (46abce5d1148b56094726d442df2fd9c) switched
from RUNNING to FAILED.

com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
Caused by: java.lang.ClassNotFoundException: g^XT
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_251]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_251]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_251]
        at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Class.forName0(Native Method) ~[?:1.8.0_251]
        at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_251]
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        ... 22 more

Reply | Threaded
Open this post in threaded view
|

Re: Re: flink kryo exception

Piotr Nowojski-4
Hi,

As Kezhu Wang pointed out, this MIGHT BE caused by the https://issues.apache.org/jira/browse/FLINK-21028 issue.

During stop with savepoint procedure, source thread might be interrupted, leaving the whole application in an invalid and inconsistent state. In FLINK-1.12.x one potential symptom is that some data will be lost during serialisation. 

When there is a record that spans more than one buffer, source thread might be interrupted while it's waiting/requesting for a next buffer to serialise the remaining part of the record. If that happens, the remaining part of the record might be simply lost, causing all kinds of deserialization errors (EOF, exceptions, stream corrupted etc) on the downstream task.

Piotrek

pon., 8 lut 2021 o 05:45 赵一旦 <[hidden email]> napisał(a):
yes, but I use stop not cancel, which also stop and cancel the job together.

Yun Gao <[hidden email]> 于2021年2月8日周一 上午11:59写道:
Hi yidan,

One more thing to confirm: are you create the savepoint and stop the job all together with 

 bin/flink cancel -s [:targetDirectory] :jobId
command ?

Best,
 Yun


------------------Original Mail ------------------
Sender:赵一旦 <[hidden email]>
Send Date:Sun Feb 7 16:13:57 2021
Recipients:Till Rohrmann <[hidden email]>
CC:Robert Metzger <[hidden email]>, user <[hidden email]>
Subject:Re: flink kryo exception
It also maybe have something to do with my job's first tasks. The second task have two input, one is the kafka source stream(A), another is self-defined mysql source as broadcast stream.(B) 
In A: I have a 'WatermarkReAssigner', a self-defined operator which add an offset to its input watermark and then forward to downstream.
In B: The parallelism is 30, but in my rich function's implementation, only the subtask-0 will do mysql query and send out records, other subtasks do nothing. All subtasks will send max_watermark - 86400_000 as the watermark.
Since both the first task have some self-defined source or implementation, I do not know whether the problem have something to do with it.

赵一旦 <[hidden email]> 于2021年2月7日周日 下午4:05写道:
The first problem is critical, since the savepoint do not work.
The second problem, in which I changed the solution, removed the 'Map' based implementation before the data are transformed to the second task, and this case savepoint works.  The only problem is that, I should stop the job and remember the savepoint path, then restart job with the savepoint path. And now it is : I stop the job, then the job failed and restart automatically with the generated savepoint.  So I do not need to restart the job anymore, since what it does automatically is what I want to do.

I have some idea that maybe it is also related to the data? So I am not sure that I can provide an example to reproduces the problem.  

Till Rohrmann <[hidden email]> 于2021年2月6日周六 上午12:13写道:
Could you provide us with a minimal working example which reproduces the problem for you? This would be super helpful in figuring out the problem you are experiencing. Thanks a lot for your help.

Cheers,
Till

On Fri, Feb 5, 2021 at 1:03 PM 赵一旦 <[hidden email]> wrote:
Yeah, and if it is different, why my job runs normally.  The problem only occurres when I stop it. 

Robert Metzger <[hidden email]> 于2021年2月5日周五 下午7:08写道:
Are you 100% sure that the jar files in the classpath (/lib folder) are exactly the same on all machines? (It can happen quite easily in a distributed standalone setup that some files are different)


On Fri, Feb 5, 2021 at 12:00 PM 赵一旦 <[hidden email]> wrote:
Flink1.12.0; only using aligned checkpoint; Standalone Cluster; 



Robert Metzger <[hidden email]> 于2021年2月5日周五 下午6:52写道:
Are you using unaligned checkpoints? (there's a known bug in 1.12.0 which can lead to corrupted data when using UC)
Can you tell us a little bit about your environment? (How are you deploying Flink, which state backend are you using, what kind of job (I guess DataStream API))

Somehow the process receiving the data is unable to deserialize it, most likely because they are configured differently (different classpath, dependency versions etc.)

On Fri, Feb 5, 2021 at 10:36 AM 赵一旦 <[hidden email]> wrote:
I do not think this is some code related problem anymore, maybe it is some bug?

赵一旦 <[hidden email]> 于2021年2月5日周五 下午4:30写道:
Hi all, I find that the failure always occurred in the second task, after the source task. So I do something in the first chaining task, I transform the 'Map' based class object to another normal class object, and the problem disappeared.

Based on the new solution, I also tried to stop and restore job with savepoint (all successful).

But, I also met another problem. Also this problem occurs while I stop the job, and also occurs in the second task after the source task. The log is below:
2021-02-05 16:21:26
java.io.EOFException
    at org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:321)
    at org.apache.flink.types.StringValue.readString(StringValue.java:783)
    at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)
    at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:33)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

It is also about serialize and deserialize, but not related to kryo this time.


Till Rohrmann <[hidden email]> 于2021年2月3日周三 下午9:22写道:
From these snippets it is hard to tell what's going wrong. Could you maybe give us a minimal example with which to reproduce the problem? Alternatively, have you read through Flink's serializer documentation [1]? Have you tried to use a simple POJO instead of inheriting from a HashMap?

The stack trace looks as if the job fails deserializing some key of your MapRecord map.


Cheers,
Till

On Wed, Feb 3, 2021 at 11:49 AM 赵一旦 <[hidden email]> wrote:
Some facts are possibly related with these, since another job do not meet these expectations.
The problem job use a class which contains a field of Class MapRecord, and MapRecord is defined to extend HashMap so as to accept variable json data.

Class MapRecord:
@NoArgsConstructor
@Slf4j
public class MapRecord extends HashMap<Object, Object> implements Serializable {
@Override
public void setTimestamp(Long timestamp) {
put("timestamp", timestamp);
put("server_time", timestamp);
}

@Override
public Long getTimestamp() {
try {
Object ts = getOrDefault("timestamp", getOrDefault("server_time", 0L));
return ((Number) Optional.ofNullable(ts).orElse(0L)).longValue();
} catch (Exception e) {
log.error("Error, MapRecord's timestamp invalid.", e);
return 0L;
}
}
}
Class UserAccessLog:
public class UserAccessLog extends AbstractRecord<UserAccessLog> {
private MapRecord d; // I think this is related to the problem...
    ... ...
}

赵一旦 <[hidden email]> 于2021年2月3日周三 下午6:43写道:
Actually the exception is different every time I stop the job.
Such as:
(1) com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
The stack as I given above.

(2) java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
2021-02-03 18:37:24
java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
    at java.util.ArrayList.rangeCheck(ArrayList.java:657)
    at java.util.ArrayList.get(ArrayList.java:433)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

(3)  com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 96
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 96
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

...

Till Rohrmann <[hidden email]> 于2021年2月3日周三 下午6:28写道:
Hi,

could you show us the job you are trying to resume? Is it a SQL job or a DataStream job, for example?

From the stack trace, it looks as if the class g^XT is not on the class path.

Cheers,
Till

On Wed, Feb 3, 2021 at 10:30 AM 赵一旦 <[hidden email]> wrote:
I have a job, the checkpoint and savepoint all right.
But, if I stop the job using 'stop -p', after the savepoint generated, then the job goes to fail. Here is the log:

2021-02-03 16:53:55,179 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - ual_ft_uid_subid_SidIncludeFilter -> ual_ft_uid_subid_Default
PassThroughFilter[null, null) -> ual_ft_uid_subid_UalUidFtExtractor -> ual_ft_uid_subid_EmptyUidFilter (17/30)#0 (46abce5d1148b56094726d442df2fd9c) switched
from RUNNING to FAILED.

com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
Caused by: java.lang.ClassNotFoundException: g^XT
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_251]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_251]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_251]
        at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Class.forName0(Native Method) ~[?:1.8.0_251]
        at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_251]
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        ... 22 more