AWS exception serialization problem

classic Classic list List threaded Threaded
16 messages Options
Reply | Threaded
Open this post in threaded view
|

AWS exception serialization problem

Shannon Carey
Has anyone encountered this or know what might be causing it?


java.lang.RuntimeException: Could not forward element to next operator
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:394)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:366)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:349)
	at org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:355)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253)
	at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Error during Java deserialization.
	at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:47)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172)
	at org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51)
	at org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:32)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:389)
	... 7 more
Caused by: java.lang.ClassNotFoundException: com.amazonaws.services.s3.model.AmazonS3Exception
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
	at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
	at java.lang.Throwable.readObject(Throwable.java:914)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
	at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
	at java.lang.Throwable.readObject(Throwable.java:914)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
	at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
	at java.lang.Throwable.readObject(Throwable.java:914)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
	at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:45)
	... 12 more
Reply | Threaded
Open this post in threaded view
|

Re: AWS exception serialization problem

Shannon Carey
This happened when running Flink with bin/run-local.sh I notice that there only appears to be one Java process. The job manager and the task manager run in the same JVM, right? I notice, however, that there are two blob store folders on disk. Could the problem be caused by two different FlinkUserCodeClassLoader objects pointing to the two different JARs?


From: Shannon Carey <[hidden email]>
Date: Monday, March 6, 2017 at 6:39 PM
To: "[hidden email]" <[hidden email]>
Subject: AWS exception serialization problem

Has anyone encountered this or know what might be causing it?


java.lang.RuntimeException: Could not forward element to next operator
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:394)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:366)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:349)
	at org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:355)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253)
	at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Error during Java deserialization.
	at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:47)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172)
	at org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51)
	at org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:32)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:389)
	... 7 more
Caused by: java.lang.ClassNotFoundException: com.amazonaws.services.s3.model.AmazonS3Exception
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
	at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
	at java.lang.Throwable.readObject(Throwable.java:914)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
	at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
	at java.lang.Throwable.readObject(Throwable.java:914)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
	at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
	at java.lang.Throwable.readObject(Throwable.java:914)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
	at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:45)
	... 12 more
Reply | Threaded
Open this post in threaded view
|

Re: AWS exception serialization problem

Shannon Carey
I think my previous guess was wrong. From what I can tell, when Kryo tries to copy the exception object, it does that by serializing and deserializing it. For subclasses of RuntimeException, it doesn't know how to do it so it delegates serialization to Java. However, it doesn't use a custom ObjectInputStream to override resolveClass() and provide classes from the user code classloader… such as happens in RocksDBStateBackend's use of InstantiationUtil.deserializeObject(). Instead, it uses ObjectInputStream$latestUserDefinedLoader() which is the Launcher$AppClassLoader which definitely doesn't have the user code in it.

Seems like a bug in TrySerializer#copy? Or somewhere that Kryo is being configured?

Thanks,
Shannon


From: Shannon Carey <[hidden email]>
Date: Monday, March 6, 2017 at 7:09 PM
To: "[hidden email]" <[hidden email]>
Subject: Re: AWS exception serialization problem

This happened when running Flink with bin/run-local.sh I notice that there only appears to be one Java process. The job manager and the task manager run in the same JVM, right? I notice, however, that there are two blob store folders on disk. Could the problem be caused by two different FlinkUserCodeClassLoader objects pointing to the two different JARs?


From: Shannon Carey <[hidden email]>
Date: Monday, March 6, 2017 at 6:39 PM
To: "[hidden email]" <[hidden email]>
Subject: AWS exception serialization problem

Has anyone encountered this or know what might be causing it?


java.lang.RuntimeException: Could not forward element to next operator
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:394)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:366)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:349)
	at org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:355)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253)
	at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Error during Java deserialization.
	at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:47)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172)
	at org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51)
	at org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:32)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:389)
	... 7 more
Caused by: java.lang.ClassNotFoundException: com.amazonaws.services.s3.model.AmazonS3Exception
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
	at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
	at java.lang.Throwable.readObject(Throwable.java:914)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
	at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
	at java.lang.Throwable.readObject(Throwable.java:914)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
	at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
	at java.lang.Throwable.readObject(Throwable.java:914)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
	at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:45)
	... 12 more
Reply | Threaded
Open this post in threaded view
|

Re: AWS exception serialization problem

Stephan Ewen
Ah, I see...

The issue is that the Kafka fetcher thread apparently do not have the user-code class loader set as the context class loader. Kryo relies on that for class resolution.

What Flink version are you on? I think that actual processing and forwarding does not happen in the Kafka Fetchers any more as of 1.2, so only Flink 1.1 should be affected...


On Tue, Mar 7, 2017 at 2:43 AM, Shannon Carey <[hidden email]> wrote:
I think my previous guess was wrong. From what I can tell, when Kryo tries to copy the exception object, it does that by serializing and deserializing it. For subclasses of RuntimeException, it doesn't know how to do it so it delegates serialization to Java. However, it doesn't use a custom ObjectInputStream to override resolveClass() and provide classes from the user code classloader… such as happens in RocksDBStateBackend's use of InstantiationUtil.deserializeObject(). Instead, it uses ObjectInputStream$latestUserDefinedLoader() which is the Launcher$AppClassLoader which definitely doesn't have the user code in it.

Seems like a bug in TrySerializer#copy? Or somewhere that Kryo is being configured?

Thanks,
Shannon


From: Shannon Carey <[hidden email]>
Date: Monday, March 6, 2017 at 7:09 PM
To: "[hidden email]" <[hidden email]>
Subject: Re: AWS exception serialization problem

This happened when running Flink with bin/run-local.sh I notice that there only appears to be one Java process. The job manager and the task manager run in the same JVM, right? I notice, however, that there are two blob store folders on disk. Could the problem be caused by two different FlinkUserCodeClassLoader objects pointing to the two different JARs?


From: Shannon Carey <[hidden email]>
Date: Monday, March 6, 2017 at 6:39 PM
To: "[hidden email]" <[hidden email]>
Subject: AWS exception serialization problem

Has anyone encountered this or know what might be causing it?


java.lang.RuntimeException: Could not forward element to next operator
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:394)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:366)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:349)
	at org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:355)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253)
	at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Error during Java deserialization.
	at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:47)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172)
	at org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51)
	at org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:32)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:389)
	... 7 more
Caused by: java.lang.ClassNotFoundException: com.amazonaws.services.s3.model.AmazonS3Exception
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
	at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
	at java.lang.Throwable.readObject(Throwable.java:914)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
	at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
	at java.lang.Throwable.readObject(Throwable.java:914)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
	at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
	at java.lang.Throwable.readObject(Throwable.java:914)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
	at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:45)
	... 12 more

Reply | Threaded
Open this post in threaded view
|

Re: AWS exception serialization problem

Tzu-Li (Gordon) Tai
In reply to this post by Shannon Carey
Hi,

I just had a quick look on this, but the Kafka fetcher thread’s context classloader doesn’t seem to be the issue (at least for 1.1.4).

In Flink 1.1.4, a separate thread from the task thread is created to run the fetcher, but since the task thread sets the user code classloader as its context classloader, shouldn’t any threads created from it (i.e., the fetcher thread) use it also?

A quickly checked the context classloader the Kafka09Fetcher thread in 1.1.4 was using, and it’s `FlinkUserCodeClassLoader`.


On March 7, 2017 at 7:32:35 PM, Stephan Ewen ([hidden email]) wrote:

Ah, I see...

The issue is that the Kafka fetcher thread apparently do not have the user-code class loader set as the context class loader. Kryo relies on that for class resolution.

What Flink version are you on? I think that actual processing and forwarding does not happen in the Kafka Fetchers any more as of 1.2, so only Flink 1.1 should be affected...


On Tue, Mar 7, 2017 at 2:43 AM, Shannon Carey <[hidden email]> wrote:
I think my previous guess was wrong. From what I can tell, when Kryo tries to copy the exception object, it does that by serializing and deserializing it. For subclasses of RuntimeException, it doesn't know how to do it so it delegates serialization to Java. However, it doesn't use a custom ObjectInputStream to override resolveClass() and provide classes from the user code classloader… such as happens in RocksDBStateBackend's use of InstantiationUtil.deserializeObject(). Instead, it uses ObjectInputStream$latestUserDefinedLoader() which is the Launcher$AppClassLoader which definitely doesn't have the user code in it.

Seems like a bug in TrySerializer#copy? Or somewhere that Kryo is being configured?

Thanks,
Shannon


From: Shannon Carey <[hidden email]>
Date: Monday, March 6, 2017 at 7:09 PM
To: "[hidden email]" <[hidden email]>
Subject: Re: AWS exception serialization problem

This happened when running Flink with bin/run-local.sh I notice that there only appears to be one Java process. The job manager and the task manager run in the same JVM, right? I notice, however, that there are two blob store folders on disk. Could the problem be caused by two different FlinkUserCodeClassLoader objects pointing to the two different JARs?


From: Shannon Carey <[hidden email]>
Date: Monday, March 6, 2017 at 6:39 PM
To: "[hidden email]" <[hidden email]>
Subject: AWS exception serialization problem

Has anyone encountered this or know what might be causing it?


java.lang.RuntimeException: Could not forward element to next operator
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:394)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:366)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:349)
        at org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:355)
        at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
        at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253)
        at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Error during Java deserialization.
        at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:47)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172)
        at org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51)
        at org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:32)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:389)
        ... 7 more
Caused by: java.lang.ClassNotFoundException: com.amazonaws.services.s3.model.AmazonS3Exception
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677)
        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
        at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
        at java.lang.Throwable.readObject(Throwable.java:914)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
        at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
        at java.lang.Throwable.readObject(Throwable.java:914)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
        at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
        at java.lang.Throwable.readObject(Throwable.java:914)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
        at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:45)
        ... 12 more

Reply | Threaded
Open this post in threaded view
|

Re: AWS exception serialization problem

Stephan Ewen
@Shannon @Gordon - is there some shading logic involved in the dependencies, concerning the AWS libraries?


On Tue, Mar 7, 2017 at 5:50 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

I just had a quick look on this, but the Kafka fetcher thread’s context classloader doesn’t seem to be the issue (at least for 1.1.4).

In Flink 1.1.4, a separate thread from the task thread is created to run the fetcher, but since the task thread sets the user code classloader as its context classloader, shouldn’t any threads created from it (i.e., the fetcher thread) use it also?

A quickly checked the context classloader the Kafka09Fetcher thread in 1.1.4 was using, and it’s `FlinkUserCodeClassLoader`.


On March 7, 2017 at 7:32:35 PM, Stephan Ewen ([hidden email]) wrote:

Ah, I see...

The issue is that the Kafka fetcher thread apparently do not have the user-code class loader set as the context class loader. Kryo relies on that for class resolution.

What Flink version are you on? I think that actual processing and forwarding does not happen in the Kafka Fetchers any more as of 1.2, so only Flink 1.1 should be affected...


On Tue, Mar 7, 2017 at 2:43 AM, Shannon Carey <[hidden email]> wrote:
I think my previous guess was wrong. From what I can tell, when Kryo tries to copy the exception object, it does that by serializing and deserializing it. For subclasses of RuntimeException, it doesn't know how to do it so it delegates serialization to Java. However, it doesn't use a custom ObjectInputStream to override resolveClass() and provide classes from the user code classloader… such as happens in RocksDBStateBackend's use of InstantiationUtil.deserializeObject(). Instead, it uses ObjectInputStream$latestUserDefinedLoader() which is the Launcher$AppClassLoader which definitely doesn't have the user code in it.

Seems like a bug in TrySerializer#copy? Or somewhere that Kryo is being configured?

Thanks,
Shannon


From: Shannon Carey <[hidden email]>
Date: Monday, March 6, 2017 at 7:09 PM
To: "[hidden email]" <[hidden email]>
Subject: Re: AWS exception serialization problem

This happened when running Flink with bin/run-local.sh I notice that there only appears to be one Java process. The job manager and the task manager run in the same JVM, right? I notice, however, that there are two blob store folders on disk. Could the problem be caused by two different FlinkUserCodeClassLoader objects pointing to the two different JARs?


From: Shannon Carey <[hidden email]>
Date: Monday, March 6, 2017 at 6:39 PM
To: "[hidden email]" <[hidden email]>
Subject: AWS exception serialization problem

Has anyone encountered this or know what might be causing it?


java.lang.RuntimeException: Could not forward element to next operator
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:394)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:366)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:349)
        at org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:355)
        at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
        at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253)
        at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Error during Java deserialization.
        at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:47)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172)
        at org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51)
        at org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:32)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:389)
        ... 7 more
Caused by: java.lang.ClassNotFoundException: com.amazonaws.services.s3.model.AmazonS3Exception
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677)
        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
        at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
        at java.lang.Throwable.readObject(Throwable.java:914)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
        at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
        at java.lang.Throwable.readObject(Throwable.java:914)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
        at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
        at java.lang.Throwable.readObject(Throwable.java:914)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
        at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:45)
        ... 12 more


Reply | Threaded
Open this post in threaded view
|

Re: AWS exception serialization problem

Shannon Carey
> is there some shading logic involved in the dependencies, concerning the AWS libraries?

Not that I am aware of. The AWS code is included in the job's fat jar as-is.

Reply | Threaded
Open this post in threaded view
|

Re: AWS exception serialization problem

Tzu-Li (Gordon) Tai
Hi Shannon,

Just to clarify:

From the error trace, it seems like that the messages fetched from Kafka are serialized `AmazonS3Exception`s, and you’re emitting a stream of `AmazonS3Exception` as records from FlinkKafkaConsumer?
Is this correct? If so, I think we should just make sure that the `com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the user fat jar.

Also, what is the Flink version you are using?

Cheers,
Gordon
Reply | Threaded
Open this post in threaded view
|

Re: AWS exception serialization problem

elmosca
Hi,

We have seen something similar in Flink 1.2. We have an operation that parses some JSON, and when it fails to parse it, we can see the ClassNotFoundException for the relevant exception (in our case JsResultException from the play-json library). The library is indeed in the shaded JAR, otherwise we would not be able to parse the JSON.

Cheers,

Bruno

On Wed, 8 Mar 2017 at 12:57 Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Shannon,

Just to clarify:

From the error trace, it seems like that the messages fetched from Kafka are serialized `AmazonS3Exception`s, and you’re emitting a stream of `AmazonS3Exception` as records from FlinkKafkaConsumer?
Is this correct? If so, I think we should just make sure that the `com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the user fat jar.

Also, what is the Flink version you are using?

Cheers,
Gordon
Reply | Threaded
Open this post in threaded view
|

Re: AWS exception serialization problem

Stephan Ewen
@Bruno: How are you running Flink? On yarn, standalone, mesos, docker?

On Wed, Mar 8, 2017 at 2:13 PM, Bruno Aranda <[hidden email]> wrote:
Hi,

We have seen something similar in Flink 1.2. We have an operation that parses some JSON, and when it fails to parse it, we can see the ClassNotFoundException for the relevant exception (in our case JsResultException from the play-json library). The library is indeed in the shaded JAR, otherwise we would not be able to parse the JSON.

Cheers,

Bruno

On Wed, 8 Mar 2017 at 12:57 Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Shannon,

Just to clarify:

From the error trace, it seems like that the messages fetched from Kafka are serialized `AmazonS3Exception`s, and you’re emitting a stream of `AmazonS3Exception` as records from FlinkKafkaConsumer?
Is this correct? If so, I think we should just make sure that the `com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the user fat jar.

Also, what is the Flink version you are using?

Cheers,
Gordon

Reply | Threaded
Open this post in threaded view
|

Re: AWS exception serialization problem

elmosca

Hi Stephan, we are running Flink 1.2.0 on Yarn (AWS EMR cluster)


On Wed, 8 Mar 2017, 21:41 Stephan Ewen, <[hidden email]> wrote:
@Bruno: How are you running Flink? On yarn, standalone, mesos, docker?

On Wed, Mar 8, 2017 at 2:13 PM, Bruno Aranda <[hidden email]> wrote:
Hi,

We have seen something similar in Flink 1.2. We have an operation that parses some JSON, and when it fails to parse it, we can see the ClassNotFoundException for the relevant exception (in our case JsResultException from the play-json library). The library is indeed in the shaded JAR, otherwise we would not be able to parse the JSON.

Cheers,

Bruno

On Wed, 8 Mar 2017 at 12:57 Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Shannon,

Just to clarify:

From the error trace, it seems like that the messages fetched from Kafka are serialized `AmazonS3Exception`s, and you’re emitting a stream of `AmazonS3Exception` as records from FlinkKafkaConsumer?
Is this correct? If so, I think we should just make sure that the `com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the user fat jar.

Also, what is the Flink version you are using?

Cheers,
Gordon

Reply | Threaded
Open this post in threaded view
|

Re: AWS exception serialization problem

rmetzger0
Can one of you guys provide us with a minimal example to reproduce the issue? (Ideally locally, not using EMR?)
I think once we can reproduce the issue its easy to fix.

On Thu, Mar 9, 2017 at 1:24 AM, Bruno Aranda <[hidden email]> wrote:

Hi Stephan, we are running Flink 1.2.0 on Yarn (AWS EMR cluster)


On Wed, 8 Mar 2017, 21:41 Stephan Ewen, <[hidden email]> wrote:
@Bruno: How are you running Flink? On yarn, standalone, mesos, docker?

On Wed, Mar 8, 2017 at 2:13 PM, Bruno Aranda <[hidden email]> wrote:
Hi,

We have seen something similar in Flink 1.2. We have an operation that parses some JSON, and when it fails to parse it, we can see the ClassNotFoundException for the relevant exception (in our case JsResultException from the play-json library). The library is indeed in the shaded JAR, otherwise we would not be able to parse the JSON.

Cheers,

Bruno

On Wed, 8 Mar 2017 at 12:57 Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Shannon,

Just to clarify:

From the error trace, it seems like that the messages fetched from Kafka are serialized `AmazonS3Exception`s, and you’re emitting a stream of `AmazonS3Exception` as records from FlinkKafkaConsumer?
Is this correct? If so, I think we should just make sure that the `com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the user fat jar.

Also, what is the Flink version you are using?

Cheers,
Gordon


Reply | Threaded
Open this post in threaded view
|

Re: AWS exception serialization problem

Shannon Carey
Here ya go (see attached).


From: Robert Metzger <[hidden email]>
Date: Friday, March 10, 2017 at 1:18 PM
To: "[hidden email]" <[hidden email]>
Subject: Re: AWS exception serialization problem

Can one of you guys provide us with a minimal example to reproduce the issue? (Ideally locally, not using EMR?)
I think once we can reproduce the issue its easy to fix.

On Thu, Mar 9, 2017 at 1:24 AM, Bruno Aranda <[hidden email]> wrote:

Hi Stephan, we are running Flink 1.2.0 on Yarn (AWS EMR cluster)


On Wed, 8 Mar 2017, 21:41 Stephan Ewen, <[hidden email]> wrote:
@Bruno: How are you running Flink? On yarn, standalone, mesos, docker?

On Wed, Mar 8, 2017 at 2:13 PM, Bruno Aranda <[hidden email]> wrote:
Hi,

We have seen something similar in Flink 1.2. We have an operation that parses some JSON, and when it fails to parse it, we can see the ClassNotFoundException for the relevant exception (in our case JsResultException from the play-json library). The library is indeed in the shaded JAR, otherwise we would not be able to parse the JSON.

Cheers,

Bruno

On Wed, 8 Mar 2017 at 12:57 Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Shannon,

Just to clarify:

From the error trace, it seems like that the messages fetched from Kafka are serialized `AmazonS3Exception`s, and you’re emitting a stream of `AmazonS3Exception` as records from FlinkKafkaConsumer?
Is this correct? If so, I think we should just make sure that the `com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the user fat jar.

Also, what is the Flink version you are using?

Cheers,
Gordon



flinktryproblem.zip (85K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: AWS exception serialization problem

Tzu-Li (Gordon) Tai
Hi Shannon,

Thanks a lot for providing the example, it was very helpful in reproducing the problem.

I think this is actually a Kryo bug, that was just recently fixed: https://github.com/EsotericSoftware/kryo/pull/483
It will be available in Kryo 4.0.1, which unfortunately doesn’t seem to be released yet.

The problem is that when Kryo defaults to Java serialization for the exception instance, the `ObjectInputStream` used to read the object does not correctly use Kryo’s configured class loader (i.e., the user code class loader). That’s why it's complaining that the class cannot be found.

We can “workaround” this by registering our own `JavaSerializer` as the serializer for Throwables in Kryo, but I’m not sure if we should actually do this, or just wait for the Kryo fix to be released.

- Gordon


On March 11, 2017 at 9:54:03 AM, Shannon Carey ([hidden email]) wrote:

Here ya go (see attached).


From: Robert Metzger <[hidden email]>
Date: Friday, March 10, 2017 at 1:18 PM
To: "[hidden email]" <[hidden email]>
Subject: Re: AWS exception serialization problem

Can one of you guys provide us with a minimal example to reproduce the issue? (Ideally locally, not using EMR?)
I think once we can reproduce the issue its easy to fix.

On Thu, Mar 9, 2017 at 1:24 AM, Bruno Aranda <[hidden email]> wrote:

Hi Stephan, we are running Flink 1.2.0 on Yarn (AWS EMR cluster)


On Wed, 8 Mar 2017, 21:41 Stephan Ewen, <[hidden email]> wrote:
@Bruno: How are you running Flink? On yarn, standalone, mesos, docker?

On Wed, Mar 8, 2017 at 2:13 PM, Bruno Aranda <[hidden email]> wrote:
Hi,

We have seen something similar in Flink 1.2. We have an operation that parses some JSON, and when it fails to parse it, we can see the ClassNotFoundException for the relevant exception (in our case JsResultException from the play-json library). The library is indeed in the shaded JAR, otherwise we would not be able to parse the JSON.

Cheers,

Bruno

On Wed, 8 Mar 2017 at 12:57 Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Shannon,

Just to clarify:

From the error trace, it seems like that the messages fetched from Kafka are serialized `AmazonS3Exception`s, and you’re emitting a stream of `AmazonS3Exception` as records from FlinkKafkaConsumer?
Is this correct? If so, I think we should just make sure that the `com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the user fat jar.

Also, what is the Flink version you are using?

Cheers,
Gordon


Reply | Threaded
Open this post in threaded view
|

Re: AWS exception serialization problem

Tzu-Li (Gordon) Tai
FYI: Here’s the JIRA ticket to track this issue - https://issues.apache.org/jira/browse/FLINK-6025.


On March 11, 2017 at 6:27:36 PM, Tzu-Li (Gordon) Tai ([hidden email]) wrote:

Hi Shannon,

Thanks a lot for providing the example, it was very helpful in reproducing the problem.

I think this is actually a Kryo bug, that was just recently fixed: https://github.com/EsotericSoftware/kryo/pull/483
It will be available in Kryo 4.0.1, which unfortunately doesn’t seem to be released yet.

The problem is that when Kryo defaults to Java serialization for the exception instance, the `ObjectInputStream` used to read the object does not correctly use Kryo’s configured class loader (i.e., the user code class loader). That’s why it's complaining that the class cannot be found.

We can “workaround” this by registering our own `JavaSerializer` as the serializer for Throwables in Kryo, but I’m not sure if we should actually do this, or just wait for the Kryo fix to be released.

- Gordon


On March 11, 2017 at 9:54:03 AM, Shannon Carey ([hidden email]) wrote:

Here ya go (see attached).


From: Robert Metzger <[hidden email]>
Date: Friday, March 10, 2017 at 1:18 PM
To: "[hidden email]" <[hidden email]>
Subject: Re: AWS exception serialization problem

Can one of you guys provide us with a minimal example to reproduce the issue? (Ideally locally, not using EMR?)
I think once we can reproduce the issue its easy to fix.

On Thu, Mar 9, 2017 at 1:24 AM, Bruno Aranda <[hidden email]> wrote:

Hi Stephan, we are running Flink 1.2.0 on Yarn (AWS EMR cluster)


On Wed, 8 Mar 2017, 21:41 Stephan Ewen, <[hidden email]> wrote:
@Bruno: How are you running Flink? On yarn, standalone, mesos, docker?

On Wed, Mar 8, 2017 at 2:13 PM, Bruno Aranda <[hidden email]> wrote:
Hi,

We have seen something similar in Flink 1.2. We have an operation that parses some JSON, and when it fails to parse it, we can see the ClassNotFoundException for the relevant exception (in our case JsResultException from the play-json library). The library is indeed in the shaded JAR, otherwise we would not be able to parse the JSON.

Cheers,

Bruno

On Wed, 8 Mar 2017 at 12:57 Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Shannon,

Just to clarify:

From the error trace, it seems like that the messages fetched from Kafka are serialized `AmazonS3Exception`s, and you’re emitting a stream of `AmazonS3Exception` as records from FlinkKafkaConsumer?
Is this correct? If so, I think we should just make sure that the `com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the user fat jar.

Also, what is the Flink version you are using?

Cheers,
Gordon


Reply | Threaded
Open this post in threaded view
|

Re: AWS exception serialization problem

rmetzger0
Thank you for analyzing the problem Gordon!

We can not upgrade Kryo anytime soon because state in old savepoints is still serialized with the current Kryo version.
I would propose to add our own JavaThrowableSerializer to Flink and document how users can register that serializer if they run into the error.

Shannon and Bruno can just use the serializer in their current Flink version.


On Sat, Mar 11, 2017 at 12:00 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
FYI: Here’s the JIRA ticket to track this issue - https://issues.apache.org/jira/browse/FLINK-6025.


On March 11, 2017 at 6:27:36 PM, Tzu-Li (Gordon) Tai ([hidden email]) wrote:

Hi Shannon,

Thanks a lot for providing the example, it was very helpful in reproducing the problem.

I think this is actually a Kryo bug, that was just recently fixed: https://github.com/EsotericSoftware/kryo/pull/483
It will be available in Kryo 4.0.1, which unfortunately doesn’t seem to be released yet.

The problem is that when Kryo defaults to Java serialization for the exception instance, the `ObjectInputStream` used to read the object does not correctly use Kryo’s configured class loader (i.e., the user code class loader). That’s why it's complaining that the class cannot be found.

We can “workaround” this by registering our own `JavaSerializer` as the serializer for Throwables in Kryo, but I’m not sure if we should actually do this, or just wait for the Kryo fix to be released.

- Gordon


On March 11, 2017 at 9:54:03 AM, Shannon Carey ([hidden email]) wrote:

Here ya go (see attached).


From: Robert Metzger <[hidden email]>
Date: Friday, March 10, 2017 at 1:18 PM
To: "[hidden email]" <[hidden email]>
Subject: Re: AWS exception serialization problem

Can one of you guys provide us with a minimal example to reproduce the issue? (Ideally locally, not using EMR?)
I think once we can reproduce the issue its easy to fix.

On Thu, Mar 9, 2017 at 1:24 AM, Bruno Aranda <[hidden email]> wrote:

Hi Stephan, we are running Flink 1.2.0 on Yarn (AWS EMR cluster)


On Wed, 8 Mar 2017, 21:41 Stephan Ewen, <[hidden email]> wrote:
@Bruno: How are you running Flink? On yarn, standalone, mesos, docker?

On Wed, Mar 8, 2017 at 2:13 PM, Bruno Aranda <[hidden email]> wrote:
Hi,

We have seen something similar in Flink 1.2. We have an operation that parses some JSON, and when it fails to parse it, we can see the ClassNotFoundException for the relevant exception (in our case JsResultException from the play-json library). The library is indeed in the shaded JAR, otherwise we would not be able to parse the JSON.

Cheers,

Bruno

On Wed, 8 Mar 2017 at 12:57 Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Shannon,

Just to clarify:

From the error trace, it seems like that the messages fetched from Kafka are serialized `AmazonS3Exception`s, and you’re emitting a stream of `AmazonS3Exception` as records from FlinkKafkaConsumer?
Is this correct? If so, I think we should just make sure that the `com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the user fat jar.

Also, what is the Flink version you are using?

Cheers,
Gordon