Hi, we have some Flink Jobs (Flink Version 1.7.1) consuming from a Custom Source and Ingesting into an Elasticsearch Cluster (V.5.6). In recent times, we see more and more Exceptions happening like this:
com.esotericsoftware.kryo.KryoException: Unable to find class: com. ^ at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: com. ^ at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) ... 13 more or com.esotericsoftware.kryo.KryoException: Unable to find class: com.fasterxml.jackson.databind.node.DoubleNod com.fasterxml.jackson.databind.node.ObjectNode Serialization trace: _children (com.fasterxml.jackson.databind.node.ObjectNode) at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.node.DoubleNod com.fasterxml.jackson.databind.node.ObjectNode at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) ... 19 more I guess somewhere the serialization between Steps in the TaskManager fails. Unfortunately, it happens very unpredictably. My question is: has someone seen this before? If yes, what was your approach on debugging it? We have this problem mostly right now on high volume event processing, so only when a high load is processed, then this appears. i tried to investigate with TRACE log level already, but that keeps the instance this is running on more busy with writing tons of logs, which slows down processing and eventually does not trigger the exception. I'm wondering if there is another way of investigation here possible. Thx in advance for any hints how to debug this. |
additionally we have these coming with this as well all the time: com.esotericsoftware.kryo.KryoException: java.lang.ArrayIndexOutOfBoundsException Serialization trace: _children (com.fasterxml.jackson.databind.node.ObjectNode) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ArrayIndexOutOfBoundsException or com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 97, Size: 29 Serialization trace: _children (com.fasterxml.jackson.databind.node.ObjectNode) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IndexOutOfBoundsException: Index: 97, Size: 29 at java.util.ArrayList.rangeCheck(ArrayList.java:657) at java.util.ArrayList.get(ArrayList.java:433) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:131) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ... 12 more Am Do., 27. Juni 2019 um 18:29 Uhr schrieb Fabian Wollert <[hidden email]>:
|
Hi Fabian, we had similar errors with Flink 1.3 [1][2] and the error was caused by the fact that a serialised was sharing the same object with multiple threads. The error was not deterministic and was changing from time to time. So maybe it could be something similar (IMHO). Best, Flavio On Fri, Jun 28, 2019 at 8:52 AM Fabian Wollert <[hidden email]> wrote:
|
Indeed looking at StreamElementSerializer the duplicate() method could be bugged: public StreamElementSerializer<T> duplicate() { TypeSerializer<T> copy = typeSerializer.duplicate(); return (copy == typeSerializer) ? this : new StreamElementSerializer<T>(copy); } Is ti safe to return this when copy == typeSerializer ...? On Fri, Jun 28, 2019 at 9:51 AM Flavio Pompermaier <[hidden email]> wrote:
|
Any news on this? Have you found the cause of the error? On Fri, Jun 28, 2019 at 10:10 AM Flavio Pompermaier <[hidden email]> wrote:
|
No, not yet. We lack some knowledge in understanding this. The only thing we found out that it happens most probably in the Elasticsearch Sink, because: - some error messages have the sink in their stack trace. - when bumping the ES nodes specs on AWS, the error happens less often (we haven't bumped it to super large instances yet, nor got to a state where they go away completely. also this would not be the ideal fix) so my current assumption is that some backpressuring is not happening correctly. but this is super vaguely, any other hints or support on this is highly appreciated. Am Do., 4. Juli 2019 um 11:26 Uhr schrieb Flavio Pompermaier <[hidden email]>:
|
I quickly checked the implementation of duplicate() for both the KryoSerializer and StreamElementSerializer (which are the only serializers involved here). They seem to be correct; especially for the KryoSerializer, since FLINK-8836 [1] we now always perform a deep copy of the KryoSerializer when duplicating it, and therefore Kryo instances should not be shared at all across duplicates. This seems to rule out any duplication issues with the serializers. As a maybe relevant question, @Fabian do you register any types / serializers via ExecutionConfig.registerKryoType(...) / ExecutionConfig.registerTypeWithKryoSerializer(...)? Best, Gordon [1] https://issues.apache.org/jira/browse/FLINK-8836 On Thu, Jul 4, 2019 at 5:29 PM Fabian Wollert <[hidden email]> wrote:
|
@Fabian do you register any types / serializers via ExecutionConfig.registerKryoType(...) / ExecutionConfig.registerTypeWithKryoSerializer(...)? Nope, not at all. our flink job code has nowhere the word "Kryo" at all. thx for looking into it ... Am Do., 4. Juli 2019 um 11:51 Uhr schrieb Tzu-Li (Gordon) Tai <[hidden email]>:
|
Free forum by Nabble | Edit this page |