Has anyone encountered this or know what might be causing it?
java.lang.RuntimeException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:394) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:366) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:349) at org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:355) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253) at java.lang.Thread.run(Thread.java:745) Caused by: com.esotericsoftware.kryo.KryoException: Error during Java deserialization. at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:47) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172) at org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51) at org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:32) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:389) ... 7 more Caused by: java.lang.ClassNotFoundException: com.amazonaws.services.s3.model.AmazonS3Exception at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552) at java.lang.Throwable.readObject(Throwable.java:914) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552) at java.lang.Throwable.readObject(Throwable.java:914) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552) at java.lang.Throwable.readObject(Throwable.java:914) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:45) ... 12 more |
This happened when running Flink with bin/run-local.sh I notice that there only appears to be one Java process. The job manager and the task manager run in the same JVM, right? I notice, however, that there are two blob store folders on disk. Could the
problem be caused by two different FlinkUserCodeClassLoader objects pointing to the two different JARs?
From: Shannon Carey <[hidden email]>
Date: Monday, March 6, 2017 at 6:39 PM To: "[hidden email]" <[hidden email]> Subject: AWS exception serialization problem Has anyone encountered this or know what might be causing it?
java.lang.RuntimeException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:394) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:366) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:349) at org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:355) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253) at java.lang.Thread.run(Thread.java:745) Caused by: com.esotericsoftware.kryo.KryoException: Error during Java deserialization. at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:47) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172) at org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51) at org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:32) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:389) ... 7 more Caused by: java.lang.ClassNotFoundException: com.amazonaws.services.s3.model.AmazonS3Exception at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552) at java.lang.Throwable.readObject(Throwable.java:914) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552) at java.lang.Throwable.readObject(Throwable.java:914) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552) at java.lang.Throwable.readObject(Throwable.java:914) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:45) ... 12 more |
I think my previous guess was wrong. From what I can tell, when Kryo tries to copy the exception object, it does that by serializing and deserializing it. For subclasses of RuntimeException, it doesn't know how to do it so it delegates serialization to
Java. However, it doesn't use a custom ObjectInputStream to override resolveClass() and provide classes from the user code classloader… such as happens in RocksDBStateBackend's use of InstantiationUtil.deserializeObject(). Instead, it uses ObjectInputStream$latestUserDefinedLoader()
which is the Launcher$AppClassLoader which definitely doesn't have the user code in it.
Seems like a bug in TrySerializer#copy? Or somewhere that Kryo is being configured?
Thanks,
Shannon
From: Shannon Carey <[hidden email]>
Date: Monday, March 6, 2017 at 7:09 PM To: "[hidden email]" <[hidden email]> Subject: Re: AWS exception serialization problem This happened when running Flink with bin/run-local.sh I notice that there only appears to be one Java process. The job manager and the task manager run in the same JVM, right? I notice, however, that there are two blob store folders on disk. Could the
problem be caused by two different FlinkUserCodeClassLoader objects pointing to the two different JARs?
From: Shannon Carey <[hidden email]>
Date: Monday, March 6, 2017 at 6:39 PM To: "[hidden email]" <[hidden email]> Subject: AWS exception serialization problem Has anyone encountered this or know what might be causing it?
java.lang.RuntimeException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:394) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:366) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:349) at org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:355) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253) at java.lang.Thread.run(Thread.java:745) Caused by: com.esotericsoftware.kryo.KryoException: Error during Java deserialization. at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:47) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172) at org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51) at org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:32) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:389) ... 7 more Caused by: java.lang.ClassNotFoundException: com.amazonaws.services.s3.model.AmazonS3Exception at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552) at java.lang.Throwable.readObject(Throwable.java:914) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552) at java.lang.Throwable.readObject(Throwable.java:914) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552) at java.lang.Throwable.readObject(Throwable.java:914) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:45) ... 12 more |
Ah, I see...
The issue is that the Kafka fetcher thread apparently do not have the user-code class loader set as the context class loader. Kryo relies on that for class resolution. What Flink version are you on? I think that actual processing and forwarding does not happen in the Kafka Fetchers any more as of 1.2, so only Flink 1.1 should be affected... On Tue, Mar 7, 2017 at 2:43 AM, Shannon Carey <[hidden email]> wrote:
|
In reply to this post by Shannon Carey
Hi,
I just had a quick look on this, but the Kafka fetcher thread’s
context classloader doesn’t seem to be the issue (at least for
1.1.4).
A quickly checked the context classloader the Kafka09Fetcher
thread in 1.1.4 was using, and it’s
`FlinkUserCodeClassLoader`.
On March 7, 2017 at 7:32:35 PM, Stephan Ewen ([hidden email]) wrote:
|
@Shannon @Gordon - is there some shading logic involved in the dependencies, concerning the AWS libraries?
On Tue, Mar 7, 2017 at 5:50 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
|
> is there some shading logic involved in the dependencies, concerning the AWS libraries?
Not that I am aware of. The AWS code is included in the job's fat jar as-is.
|
Hi Shannon, Just to clarify: From the error trace, it seems like that the messages fetched from Kafka are serialized `AmazonS3Exception`s, and you’re emitting a stream of `AmazonS3Exception` as records from FlinkKafkaConsumer? Is this correct? If so, I think we should just make sure that the `com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the user fat jar. Also, what is the Flink version you are using? Cheers, Gordon
|
Hi, We have seen something similar in Flink 1.2. We have an operation that parses some JSON, and when it fails to parse it, we can see the ClassNotFoundException for the relevant exception (in our case JsResultException from the play-json library). The library is indeed in the shaded JAR, otherwise we would not be able to parse the JSON. Cheers, Bruno On Wed, 8 Mar 2017 at 12:57 Tzu-Li (Gordon) Tai <[hidden email]> wrote:
|
@Bruno: How are you running Flink? On yarn, standalone, mesos, docker?
On Wed, Mar 8, 2017 at 2:13 PM, Bruno Aranda <[hidden email]> wrote:
|
Hi Stephan, we are running Flink 1.2.0 on Yarn (AWS EMR cluster) On Wed, 8 Mar 2017, 21:41 Stephan Ewen, <[hidden email]> wrote:
|
Can one of you guys provide us with a minimal example to reproduce the issue? (Ideally locally, not using EMR?) I think once we can reproduce the issue its easy to fix. On Thu, Mar 9, 2017 at 1:24 AM, Bruno Aranda <[hidden email]> wrote:
|
Here ya go (see attached).
From: Robert Metzger <[hidden email]>
Date: Friday, March 10, 2017 at 1:18 PM To: "[hidden email]" <[hidden email]> Subject: Re: AWS exception serialization problem Can one of you guys provide us with a minimal example to reproduce the issue? (Ideally locally, not using EMR?)
I think once we can reproduce the issue its easy to fix.
On Thu, Mar 9, 2017 at 1:24 AM, Bruno Aranda
<[hidden email]> wrote:
flinktryproblem.zip (85K) Download Attachment |
Hi Shannon, Thanks a lot for providing the example, it was very helpful in reproducing the problem. I think this is actually a Kryo bug, that was just recently fixed: https://github.com/EsotericSoftware/kryo/pull/483 It will be available in Kryo 4.0.1, which unfortunately doesn’t seem to be released yet. The problem is that when Kryo defaults to Java serialization for the exception instance, the `ObjectInputStream` used to read the object does not correctly use Kryo’s configured class loader (i.e., the user code class loader). That’s why it's complaining that the class cannot be found. We can “workaround” this by registering our own `JavaSerializer` as the serializer for Throwables in Kryo, but I’m not sure if we should actually do this, or just wait for the Kryo fix to be released. - Gordon On March 11, 2017 at 9:54:03 AM, Shannon Carey ([hidden email]) wrote:
|
FYI: Here’s the JIRA ticket to track this issue - https://issues.apache.org/jira/browse/FLINK-6025. On March 11, 2017 at 6:27:36 PM, Tzu-Li (Gordon) Tai ([hidden email]) wrote:
|
Thank you for analyzing the problem Gordon! We can not upgrade Kryo anytime soon because state in old savepoints is still serialized with the current Kryo version. I would propose to add our own JavaThrowableSerializer to Flink and document how users can register that serializer if they run into the error. Shannon and Bruno can just use the serializer in their current Flink version. On Sat, Mar 11, 2017 at 12:00 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |