classloading strangeness with Avro in Flink

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

classloading strangeness with Avro in Flink

Cliff Resnick
Our Flink/YARN pipeline has been reading Avro from Kafka for a while now. We just introduced a source of Avro OCF (Object Container Files) read from S3. The Kafka Avro continued to decode without incident, but the OCF files failed 100% with anomalous parse errors in the decoding phase after the schema and codec were successfully read from them. The pipeline would work on my laptop, and when I submitted a test Main program to the Flink Session in YARN, that would also successfully decode. Only the actual pipeline run from the TaskManager failed. At one point I even remote debugged the TaskManager process and stepped through what looked like a normal Avro decode (if you can describe Avro code as normal!) -- until it abruptly failed with an int decode or what-have-you.

This stumped me for a while, but I finally tried moving flink-avro.jar from the lib to the application jar, and that fixed it. I'm not sure why this is, especially since there were no typical classloader-type errors.  This issue was observed both on Flink 1.5 and 1.6 in Flip-6 mode.

-Cliff




Reply | Threaded
Open this post in threaded view
|

Re: classloading strangeness with Avro in Flink

vino yang
Hi Cliff,

My personal guess is that this may be caused by Job's Avro conflict with the Avro that the Flink framework itself relies on. 
Flink has provided some configuration parameters which allows you to determine the order of the classloaders yourself. [1]
Alternatively, you can debug classloading and participate in the documentation.[2]


Thanks, vino.

Cliff Resnick <[hidden email]> 于2018年8月20日周一 上午10:40写道:
Our Flink/YARN pipeline has been reading Avro from Kafka for a while now. We just introduced a source of Avro OCF (Object Container Files) read from S3. The Kafka Avro continued to decode without incident, but the OCF files failed 100% with anomalous parse errors in the decoding phase after the schema and codec were successfully read from them. The pipeline would work on my laptop, and when I submitted a test Main program to the Flink Session in YARN, that would also successfully decode. Only the actual pipeline run from the TaskManager failed. At one point I even remote debugged the TaskManager process and stepped through what looked like a normal Avro decode (if you can describe Avro code as normal!) -- until it abruptly failed with an int decode or what-have-you.

This stumped me for a while, but I finally tried moving flink-avro.jar from the lib to the application jar, and that fixed it. I'm not sure why this is, especially since there were no typical classloader-type errors.  This issue was observed both on Flink 1.5 and 1.6 in Flip-6 mode.

-Cliff




Reply | Threaded
Open this post in threaded view
|

Re: classloading strangeness with Avro in Flink

Cliff Resnick
Hi Vino,

Thanks for the explanation, but the job only ever uses the Avro (1.8.2) pulled in by flink-formats/avro, so it's not a class version conflict there. 

I'm using default child-first loading. It might be a further transitive dependency, though it's not clear by stack trace or stepping through the process. When I get a chance I'll look further into it but in case anyone is experiencing similar problems, what is clear is that classloader order does matter with Avro.

On Sun, Aug 19, 2018, 11:36 PM vino yang <[hidden email]> wrote:
Hi Cliff,

My personal guess is that this may be caused by Job's Avro conflict with the Avro that the Flink framework itself relies on. 
Flink has provided some configuration parameters which allows you to determine the order of the classloaders yourself. [1]
Alternatively, you can debug classloading and participate in the documentation.[2]


Thanks, vino.

Cliff Resnick <[hidden email]> 于2018年8月20日周一 上午10:40写道:
Our Flink/YARN pipeline has been reading Avro from Kafka for a while now. We just introduced a source of Avro OCF (Object Container Files) read from S3. The Kafka Avro continued to decode without incident, but the OCF files failed 100% with anomalous parse errors in the decoding phase after the schema and codec were successfully read from them. The pipeline would work on my laptop, and when I submitted a test Main program to the Flink Session in YARN, that would also successfully decode. Only the actual pipeline run from the TaskManager failed. At one point I even remote debugged the TaskManager process and stepped through what looked like a normal Avro decode (if you can describe Avro code as normal!) -- until it abruptly failed with an int decode or what-have-you.

This stumped me for a while, but I finally tried moving flink-avro.jar from the lib to the application jar, and that fixed it. I'm not sure why this is, especially since there were no typical classloader-type errors.  This issue was observed both on Flink 1.5 and 1.6 in Flip-6 mode.

-Cliff





Reply | Threaded
Open this post in threaded view
|

Re: classloading strangeness with Avro in Flink

Cliff Resnick
Hi Vino,

You were right in your assumption -- unshaded avro was being added to our application jar via third-party dependency. Excluding it in packaging fixed the issue. For the record, it looks flink-avro must be loaded from the lib or there will be errors in checkpoint restores.

On Mon, Aug 20, 2018 at 8:43 AM Cliff Resnick <[hidden email]> wrote:
Hi Vino,

Thanks for the explanation, but the job only ever uses the Avro (1.8.2) pulled in by flink-formats/avro, so it's not a class version conflict there. 

I'm using default child-first loading. It might be a further transitive dependency, though it's not clear by stack trace or stepping through the process. When I get a chance I'll look further into it but in case anyone is experiencing similar problems, what is clear is that classloader order does matter with Avro.

On Sun, Aug 19, 2018, 11:36 PM vino yang <[hidden email]> wrote:
Hi Cliff,

My personal guess is that this may be caused by Job's Avro conflict with the Avro that the Flink framework itself relies on. 
Flink has provided some configuration parameters which allows you to determine the order of the classloaders yourself. [1]
Alternatively, you can debug classloading and participate in the documentation.[2]


Thanks, vino.

Cliff Resnick <[hidden email]> 于2018年8月20日周一 上午10:40写道:
Our Flink/YARN pipeline has been reading Avro from Kafka for a while now. We just introduced a source of Avro OCF (Object Container Files) read from S3. The Kafka Avro continued to decode without incident, but the OCF files failed 100% with anomalous parse errors in the decoding phase after the schema and codec were successfully read from them. The pipeline would work on my laptop, and when I submitted a test Main program to the Flink Session in YARN, that would also successfully decode. Only the actual pipeline run from the TaskManager failed. At one point I even remote debugged the TaskManager process and stepped through what looked like a normal Avro decode (if you can describe Avro code as normal!) -- until it abruptly failed with an int decode or what-have-you.

This stumped me for a while, but I finally tried moving flink-avro.jar from the lib to the application jar, and that fixed it. I'm not sure why this is, especially since there were no typical classloader-type errors.  This issue was observed both on Flink 1.5 and 1.6 in Flip-6 mode.

-Cliff





Reply | Threaded
Open this post in threaded view
|

Re: classloading strangeness with Avro in Flink

Jamie Grier-2
Hey Cliff, can you provide the stack trace of the issue you were seeing?  We recently ran into a similar issue that we're still debugging.  Did it look like this:

java.lang.IllegalStateException: Could not initialize operator state backend.
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException: Could not find required Avro dependency.
at org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass.read(Serializers.java:170)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:584)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:399)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
... 6 common frames omitted
00:17:19.626 INFO o.a.f.r.e.ExecutionGraph - Job ClientEventToElasticsearchJob (5cec438674e9a111703c83897f7c8138) switched from state RUNNING to FAILING.
java.lang.IllegalStateException: Could not initialize operator state backend.
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException: Could not find required Avro dependency.
at org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass.read(Serializers.java:170)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:584)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:399)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
... 6 common frames omitted

-Jamie
 

On Mon, Aug 20, 2018 at 10:42 AM, Cliff Resnick <[hidden email]> wrote:
Hi Vino,

You were right in your assumption -- unshaded avro was being added to our application jar via third-party dependency. Excluding it in packaging fixed the issue. For the record, it looks flink-avro must be loaded from the lib or there will be errors in checkpoint restores.

On Mon, Aug 20, 2018 at 8:43 AM Cliff Resnick <[hidden email]> wrote:
Hi Vino,

Thanks for the explanation, but the job only ever uses the Avro (1.8.2) pulled in by flink-formats/avro, so it's not a class version conflict there. 

I'm using default child-first loading. It might be a further transitive dependency, though it's not clear by stack trace or stepping through the process. When I get a chance I'll look further into it but in case anyone is experiencing similar problems, what is clear is that classloader order does matter with Avro.

On Sun, Aug 19, 2018, 11:36 PM vino yang <[hidden email]> wrote:
Hi Cliff,

My personal guess is that this may be caused by Job's Avro conflict with the Avro that the Flink framework itself relies on. 
Flink has provided some configuration parameters which allows you to determine the order of the classloaders yourself. [1]
Alternatively, you can debug classloading and participate in the documentation.[2]


Thanks, vino.

Cliff Resnick <[hidden email]> 于2018年8月20日周一 上午10:40写道:
Our Flink/YARN pipeline has been reading Avro from Kafka for a while now. We just introduced a source of Avro OCF (Object Container Files) read from S3. The Kafka Avro continued to decode without incident, but the OCF files failed 100% with anomalous parse errors in the decoding phase after the schema and codec were successfully read from them. The pipeline would work on my laptop, and when I submitted a test Main program to the Flink Session in YARN, that would also successfully decode. Only the actual pipeline run from the TaskManager failed. At one point I even remote debugged the TaskManager process and stepped through what looked like a normal Avro decode (if you can describe Avro code as normal!) -- until it abruptly failed with an int decode or what-have-you.

This stumped me for a while, but I finally tried moving flink-avro.jar from the lib to the application jar, and that fixed it. I'm not sure why this is, especially since there were no typical classloader-type errors.  This issue was observed both on Flink 1.5 and 1.6 in Flip-6 mode.

-Cliff






Reply | Threaded
Open this post in threaded view
|

Re: classloading strangeness with Avro in Flink

Cliff Resnick
Hi Jamie,

No, it was nothing of the class not found variety, just parse errors. It had to do with Avro getting mixed up with different versions.

-Cliff

On Mon, Aug 20, 2018 at 4:18 PM Jamie Grier <[hidden email]> wrote:
Hey Cliff, can you provide the stack trace of the issue you were seeing?  We recently ran into a similar issue that we're still debugging.  Did it look like this:

java.lang.IllegalStateException: Could not initialize operator state backend.
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException: Could not find required Avro dependency.
at org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass.read(Serializers.java:170)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:584)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:399)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
... 6 common frames omitted
00:17:19.626 INFO o.a.f.r.e.ExecutionGraph - Job ClientEventToElasticsearchJob (5cec438674e9a111703c83897f7c8138) switched from state RUNNING to FAILING.
java.lang.IllegalStateException: Could not initialize operator state backend.
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException: Could not find required Avro dependency.
at org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass.read(Serializers.java:170)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:584)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:399)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
... 6 common frames omitted

-Jamie
 

On Mon, Aug 20, 2018 at 10:42 AM, Cliff Resnick <[hidden email]> wrote:
Hi Vino,

You were right in your assumption -- unshaded avro was being added to our application jar via third-party dependency. Excluding it in packaging fixed the issue. For the record, it looks flink-avro must be loaded from the lib or there will be errors in checkpoint restores.

On Mon, Aug 20, 2018 at 8:43 AM Cliff Resnick <[hidden email]> wrote:
Hi Vino,

Thanks for the explanation, but the job only ever uses the Avro (1.8.2) pulled in by flink-formats/avro, so it's not a class version conflict there. 

I'm using default child-first loading. It might be a further transitive dependency, though it's not clear by stack trace or stepping through the process. When I get a chance I'll look further into it but in case anyone is experiencing similar problems, what is clear is that classloader order does matter with Avro.

On Sun, Aug 19, 2018, 11:36 PM vino yang <[hidden email]> wrote:
Hi Cliff,

My personal guess is that this may be caused by Job's Avro conflict with the Avro that the Flink framework itself relies on. 
Flink has provided some configuration parameters which allows you to determine the order of the classloaders yourself. [1]
Alternatively, you can debug classloading and participate in the documentation.[2]


Thanks, vino.

Cliff Resnick <[hidden email]> 于2018年8月20日周一 上午10:40写道:
Our Flink/YARN pipeline has been reading Avro from Kafka for a while now. We just introduced a source of Avro OCF (Object Container Files) read from S3. The Kafka Avro continued to decode without incident, but the OCF files failed 100% with anomalous parse errors in the decoding phase after the schema and codec were successfully read from them. The pipeline would work on my laptop, and when I submitted a test Main program to the Flink Session in YARN, that would also successfully decode. Only the actual pipeline run from the TaskManager failed. At one point I even remote debugged the TaskManager process and stepped through what looked like a normal Avro decode (if you can describe Avro code as normal!) -- until it abruptly failed with an int decode or what-have-you.

This stumped me for a while, but I finally tried moving flink-avro.jar from the lib to the application jar, and that fixed it. I'm not sure why this is, especially since there were no typical classloader-type errors.  This issue was observed both on Flink 1.5 and 1.6 in Flip-6 mode.

-Cliff






Reply | Threaded
Open this post in threaded view
|

Re: classloading strangeness with Avro in Flink

Cliff Resnick
In reply to this post by Cliff Resnick
Hi Vino,

Unfortunately, I'm still stuck here. By moving the avro dependency chain to lib (and removing it from user jar), my OCFs decode but I get the error described here:


However, the Flink fix described in the PR above was to move the Avro dependency to the user jar. However, since I'm using YARN, I'm required to have flink-shaded-hadoop2-uber.jar loaded from lib -- and that has avro bundled un-shaded. So I'm back to the start problem...

Any advice is welcome!

-Cliff


On Mon, Aug 20, 2018 at 1:42 PM Cliff Resnick <[hidden email]> wrote:
Hi Vino,

You were right in your assumption -- unshaded avro was being added to our application jar via third-party dependency. Excluding it in packaging fixed the issue. For the record, it looks flink-avro must be loaded from the lib or there will be errors in checkpoint restores.

On Mon, Aug 20, 2018 at 8:43 AM Cliff Resnick <[hidden email]> wrote:
Hi Vino,

Thanks for the explanation, but the job only ever uses the Avro (1.8.2) pulled in by flink-formats/avro, so it's not a class version conflict there. 

I'm using default child-first loading. It might be a further transitive dependency, though it's not clear by stack trace or stepping through the process. When I get a chance I'll look further into it but in case anyone is experiencing similar problems, what is clear is that classloader order does matter with Avro.

On Sun, Aug 19, 2018, 11:36 PM vino yang <[hidden email]> wrote:
Hi Cliff,

My personal guess is that this may be caused by Job's Avro conflict with the Avro that the Flink framework itself relies on. 
Flink has provided some configuration parameters which allows you to determine the order of the classloaders yourself. [1]
Alternatively, you can debug classloading and participate in the documentation.[2]


Thanks, vino.

Cliff Resnick <[hidden email]> 于2018年8月20日周一 上午10:40写道:
Our Flink/YARN pipeline has been reading Avro from Kafka for a while now. We just introduced a source of Avro OCF (Object Container Files) read from S3. The Kafka Avro continued to decode without incident, but the OCF files failed 100% with anomalous parse errors in the decoding phase after the schema and codec were successfully read from them. The pipeline would work on my laptop, and when I submitted a test Main program to the Flink Session in YARN, that would also successfully decode. Only the actual pipeline run from the TaskManager failed. At one point I even remote debugged the TaskManager process and stepped through what looked like a normal Avro decode (if you can describe Avro code as normal!) -- until it abruptly failed with an int decode or what-have-you.

This stumped me for a while, but I finally tried moving flink-avro.jar from the lib to the application jar, and that fixed it. I'm not sure why this is, especially since there were no typical classloader-type errors.  This issue was observed both on Flink 1.5 and 1.6 in Flip-6 mode.

-Cliff





Reply | Threaded
Open this post in threaded view
|

Re: classloading strangeness with Avro in Flink

vino yang
Hi Cliff,

If so, you can explicitly exclude Avro's dependencies from related dependencies (using <exclude>) and then directly introduce dependencies on the Avro version you need.

Thanks, vino.

Cliff Resnick <[hidden email]> 于2018年8月21日周二 上午5:13写道:
Hi Vino,

Unfortunately, I'm still stuck here. By moving the avro dependency chain to lib (and removing it from user jar), my OCFs decode but I get the error described here:


However, the Flink fix described in the PR above was to move the Avro dependency to the user jar. However, since I'm using YARN, I'm required to have flink-shaded-hadoop2-uber.jar loaded from lib -- and that has avro bundled un-shaded. So I'm back to the start problem...

Any advice is welcome!

-Cliff


On Mon, Aug 20, 2018 at 1:42 PM Cliff Resnick <[hidden email]> wrote:
Hi Vino,

You were right in your assumption -- unshaded avro was being added to our application jar via third-party dependency. Excluding it in packaging fixed the issue. For the record, it looks flink-avro must be loaded from the lib or there will be errors in checkpoint restores.

On Mon, Aug 20, 2018 at 8:43 AM Cliff Resnick <[hidden email]> wrote:
Hi Vino,

Thanks for the explanation, but the job only ever uses the Avro (1.8.2) pulled in by flink-formats/avro, so it's not a class version conflict there. 

I'm using default child-first loading. It might be a further transitive dependency, though it's not clear by stack trace or stepping through the process. When I get a chance I'll look further into it but in case anyone is experiencing similar problems, what is clear is that classloader order does matter with Avro.

On Sun, Aug 19, 2018, 11:36 PM vino yang <[hidden email]> wrote:
Hi Cliff,

My personal guess is that this may be caused by Job's Avro conflict with the Avro that the Flink framework itself relies on. 
Flink has provided some configuration parameters which allows you to determine the order of the classloaders yourself. [1]
Alternatively, you can debug classloading and participate in the documentation.[2]


Thanks, vino.

Cliff Resnick <[hidden email]> 于2018年8月20日周一 上午10:40写道:
Our Flink/YARN pipeline has been reading Avro from Kafka for a while now. We just introduced a source of Avro OCF (Object Container Files) read from S3. The Kafka Avro continued to decode without incident, but the OCF files failed 100% with anomalous parse errors in the decoding phase after the schema and codec were successfully read from them. The pipeline would work on my laptop, and when I submitted a test Main program to the Flink Session in YARN, that would also successfully decode. Only the actual pipeline run from the TaskManager failed. At one point I even remote debugged the TaskManager process and stepped through what looked like a normal Avro decode (if you can describe Avro code as normal!) -- until it abruptly failed with an int decode or what-have-you.

This stumped me for a while, but I finally tried moving flink-avro.jar from the lib to the application jar, and that fixed it. I'm not sure why this is, especially since there were no typical classloader-type errors.  This issue was observed both on Flink 1.5 and 1.6 in Flip-6 mode.

-Cliff





Reply | Threaded
Open this post in threaded view
|

Re: classloading strangeness with Avro in Flink

Aljoscha Krettek
Hi Cliff,

Do you actually need the flink-shaded-hadoop2-uber.jar in lib. If you're running on YARN, you should be able to just remove them because with YARN you will have Hadoop in the classpath anyways.

Aljoscha

On 21. Aug 2018, at 03:45, vino yang <[hidden email]> wrote:

Hi Cliff,

If so, you can explicitly exclude Avro's dependencies from related dependencies (using <exclude>) and then directly introduce dependencies on the Avro version you need.

Thanks, vino.

Cliff Resnick <[hidden email]> 于2018年8月21日周二 上午5:13写道:
Hi Vino,

Unfortunately, I'm still stuck here. By moving the avro dependency chain to lib (and removing it from user jar), my OCFs decode but I get the error described here:


However, the Flink fix described in the PR above was to move the Avro dependency to the user jar. However, since I'm using YARN, I'm required to have flink-shaded-hadoop2-uber.jar loaded from lib -- and that has avro bundled un-shaded. So I'm back to the start problem...

Any advice is welcome!

-Cliff


On Mon, Aug 20, 2018 at 1:42 PM Cliff Resnick <[hidden email]> wrote:
Hi Vino,

You were right in your assumption -- unshaded avro was being added to our application jar via third-party dependency. Excluding it in packaging fixed the issue. For the record, it looks flink-avro must be loaded from the lib or there will be errors in checkpoint restores.

On Mon, Aug 20, 2018 at 8:43 AM Cliff Resnick <[hidden email]> wrote:
Hi Vino,

Thanks for the explanation, but the job only ever uses the Avro (1.8.2) pulled in by flink-formats/avro, so it's not a class version conflict there. 

I'm using default child-first loading. It might be a further transitive dependency, though it's not clear by stack trace or stepping through the process. When I get a chance I'll look further into it but in case anyone is experiencing similar problems, what is clear is that classloader order does matter with Avro.

On Sun, Aug 19, 2018, 11:36 PM vino yang <[hidden email]> wrote:
Hi Cliff,

My personal guess is that this may be caused by Job's Avro conflict with the Avro that the Flink framework itself relies on. 
Flink has provided some configuration parameters which allows you to determine the order of the classloaders yourself. [1]
Alternatively, you can debug classloading and participate in the documentation.[2]


Thanks, vino.

Cliff Resnick <[hidden email]> 于2018年8月20日周一 上午10:40写道:
Our Flink/YARN pipeline has been reading Avro from Kafka for a while now. We just introduced a source of Avro OCF (Object Container Files) read from S3. The Kafka Avro continued to decode without incident, but the OCF files failed 100% with anomalous parse errors in the decoding phase after the schema and codec were successfully read from them. The pipeline would work on my laptop, and when I submitted a test Main program to the Flink Session in YARN, that would also successfully decode. Only the actual pipeline run from the TaskManager failed. At one point I even remote debugged the TaskManager process and stepped through what looked like a normal Avro decode (if you can describe Avro code as normal!) -- until it abruptly failed with an int decode or what-have-you.

This stumped me for a while, but I finally tried moving flink-avro.jar from the lib to the application jar, and that fixed it. I'm not sure why this is, especially since there were no typical classloader-type errors.  This issue was observed both on Flink 1.5 and 1.6 in Flip-6 mode.

-Cliff






Reply | Threaded
Open this post in threaded view
|

Re: classloading strangeness with Avro in Flink

Cliff Resnick
Hi Aljoscha,

We need flink-shaded-hadoop2-uber.jar because there is no hadoop distro on the instance the Flink session/jobs is managed from and the process that launches Flink is not a java process, but execs a process that calls the flink script.

-Cliff

On Tue, Aug 21, 2018 at 5:11 AM Aljoscha Krettek <[hidden email]> wrote:
Hi Cliff,

Do you actually need the flink-shaded-hadoop2-uber.jar in lib. If you're running on YARN, you should be able to just remove them because with YARN you will have Hadoop in the classpath anyways.

Aljoscha

On 21. Aug 2018, at 03:45, vino yang <[hidden email]> wrote:

Hi Cliff,

If so, you can explicitly exclude Avro's dependencies from related dependencies (using <exclude>) and then directly introduce dependencies on the Avro version you need.

Thanks, vino.

Cliff Resnick <[hidden email]> 于2018年8月21日周二 上午5:13写道:
Hi Vino,

Unfortunately, I'm still stuck here. By moving the avro dependency chain to lib (and removing it from user jar), my OCFs decode but I get the error described here:


However, the Flink fix described in the PR above was to move the Avro dependency to the user jar. However, since I'm using YARN, I'm required to have flink-shaded-hadoop2-uber.jar loaded from lib -- and that has avro bundled un-shaded. So I'm back to the start problem...

Any advice is welcome!

-Cliff


On Mon, Aug 20, 2018 at 1:42 PM Cliff Resnick <[hidden email]> wrote:
Hi Vino,

You were right in your assumption -- unshaded avro was being added to our application jar via third-party dependency. Excluding it in packaging fixed the issue. For the record, it looks flink-avro must be loaded from the lib or there will be errors in checkpoint restores.

On Mon, Aug 20, 2018 at 8:43 AM Cliff Resnick <[hidden email]> wrote:
Hi Vino,

Thanks for the explanation, but the job only ever uses the Avro (1.8.2) pulled in by flink-formats/avro, so it's not a class version conflict there. 

I'm using default child-first loading. It might be a further transitive dependency, though it's not clear by stack trace or stepping through the process. When I get a chance I'll look further into it but in case anyone is experiencing similar problems, what is clear is that classloader order does matter with Avro.

On Sun, Aug 19, 2018, 11:36 PM vino yang <[hidden email]> wrote:
Hi Cliff,

My personal guess is that this may be caused by Job's Avro conflict with the Avro that the Flink framework itself relies on. 
Flink has provided some configuration parameters which allows you to determine the order of the classloaders yourself. [1]
Alternatively, you can debug classloading and participate in the documentation.[2]


Thanks, vino.

Cliff Resnick <[hidden email]> 于2018年8月20日周一 上午10:40写道:
Our Flink/YARN pipeline has been reading Avro from Kafka for a while now. We just introduced a source of Avro OCF (Object Container Files) read from S3. The Kafka Avro continued to decode without incident, but the OCF files failed 100% with anomalous parse errors in the decoding phase after the schema and codec were successfully read from them. The pipeline would work on my laptop, and when I submitted a test Main program to the Flink Session in YARN, that would also successfully decode. Only the actual pipeline run from the TaskManager failed. At one point I even remote debugged the TaskManager process and stepped through what looked like a normal Avro decode (if you can describe Avro code as normal!) -- until it abruptly failed with an int decode or what-have-you.

This stumped me for a while, but I finally tried moving flink-avro.jar from the lib to the application jar, and that fixed it. I'm not sure why this is, especially since there were no typical classloader-type errors.  This issue was observed both on Flink 1.5 and 1.6 in Flip-6 mode.

-Cliff






Reply | Threaded
Open this post in threaded view
|

Re: classloading strangeness with Avro in Flink

Cliff Resnick
Solved this by moving flink-avro to lib and reverting to `classloader.resolve-order: parent-first`.  I still don't know why, but I guess if you're reading Avro both from file and Kafka in the same pipeline then inverted class loader delegation will not work. Thanks, Vino for your help!

On Tue, Aug 21, 2018 at 8:02 AM Cliff Resnick <[hidden email]> wrote:
Hi Aljoscha,

We need flink-shaded-hadoop2-uber.jar because there is no hadoop distro on the instance the Flink session/jobs is managed from and the process that launches Flink is not a java process, but execs a process that calls the flink script.

-Cliff

On Tue, Aug 21, 2018 at 5:11 AM Aljoscha Krettek <[hidden email]> wrote:
Hi Cliff,

Do you actually need the flink-shaded-hadoop2-uber.jar in lib. If you're running on YARN, you should be able to just remove them because with YARN you will have Hadoop in the classpath anyways.

Aljoscha

On 21. Aug 2018, at 03:45, vino yang <[hidden email]> wrote:

Hi Cliff,

If so, you can explicitly exclude Avro's dependencies from related dependencies (using <exclude>) and then directly introduce dependencies on the Avro version you need.

Thanks, vino.

Cliff Resnick <[hidden email]> 于2018年8月21日周二 上午5:13写道:
Hi Vino,

Unfortunately, I'm still stuck here. By moving the avro dependency chain to lib (and removing it from user jar), my OCFs decode but I get the error described here:


However, the Flink fix described in the PR above was to move the Avro dependency to the user jar. However, since I'm using YARN, I'm required to have flink-shaded-hadoop2-uber.jar loaded from lib -- and that has avro bundled un-shaded. So I'm back to the start problem...

Any advice is welcome!

-Cliff


On Mon, Aug 20, 2018 at 1:42 PM Cliff Resnick <[hidden email]> wrote:
Hi Vino,

You were right in your assumption -- unshaded avro was being added to our application jar via third-party dependency. Excluding it in packaging fixed the issue. For the record, it looks flink-avro must be loaded from the lib or there will be errors in checkpoint restores.

On Mon, Aug 20, 2018 at 8:43 AM Cliff Resnick <[hidden email]> wrote:
Hi Vino,

Thanks for the explanation, but the job only ever uses the Avro (1.8.2) pulled in by flink-formats/avro, so it's not a class version conflict there. 

I'm using default child-first loading. It might be a further transitive dependency, though it's not clear by stack trace or stepping through the process. When I get a chance I'll look further into it but in case anyone is experiencing similar problems, what is clear is that classloader order does matter with Avro.

On Sun, Aug 19, 2018, 11:36 PM vino yang <[hidden email]> wrote:
Hi Cliff,

My personal guess is that this may be caused by Job's Avro conflict with the Avro that the Flink framework itself relies on. 
Flink has provided some configuration parameters which allows you to determine the order of the classloaders yourself. [1]
Alternatively, you can debug classloading and participate in the documentation.[2]


Thanks, vino.

Cliff Resnick <[hidden email]> 于2018年8月20日周一 上午10:40写道:
Our Flink/YARN pipeline has been reading Avro from Kafka for a while now. We just introduced a source of Avro OCF (Object Container Files) read from S3. The Kafka Avro continued to decode without incident, but the OCF files failed 100% with anomalous parse errors in the decoding phase after the schema and codec were successfully read from them. The pipeline would work on my laptop, and when I submitted a test Main program to the Flink Session in YARN, that would also successfully decode. Only the actual pipeline run from the TaskManager failed. At one point I even remote debugged the TaskManager process and stepped through what looked like a normal Avro decode (if you can describe Avro code as normal!) -- until it abruptly failed with an int decode or what-have-you.

This stumped me for a while, but I finally tried moving flink-avro.jar from the lib to the application jar, and that fixed it. I'm not sure why this is, especially since there were no typical classloader-type errors.  This issue was observed both on Flink 1.5 and 1.6 in Flip-6 mode.

-Cliff






Reply | Threaded
Open this post in threaded view
|

Re: classloading strangeness with Avro in Flink

vino yang
Hi Cliff,

You are welcome, I am very happy to hear this message.

Thanks, vino.

Cliff Resnick <[hidden email]> 于2018年8月21日周二 下午11:46写道:
Solved this by moving flink-avro to lib and reverting to `classloader.resolve-order: parent-first`.  I still don't know why, but I guess if you're reading Avro both from file and Kafka in the same pipeline then inverted class loader delegation will not work. Thanks, Vino for your help!

On Tue, Aug 21, 2018 at 8:02 AM Cliff Resnick <[hidden email]> wrote:
Hi Aljoscha,

We need flink-shaded-hadoop2-uber.jar because there is no hadoop distro on the instance the Flink session/jobs is managed from and the process that launches Flink is not a java process, but execs a process that calls the flink script.

-Cliff

On Tue, Aug 21, 2018 at 5:11 AM Aljoscha Krettek <[hidden email]> wrote:
Hi Cliff,

Do you actually need the flink-shaded-hadoop2-uber.jar in lib. If you're running on YARN, you should be able to just remove them because with YARN you will have Hadoop in the classpath anyways.

Aljoscha

On 21. Aug 2018, at 03:45, vino yang <[hidden email]> wrote:

Hi Cliff,

If so, you can explicitly exclude Avro's dependencies from related dependencies (using <exclude>) and then directly introduce dependencies on the Avro version you need.

Thanks, vino.

Cliff Resnick <[hidden email]> 于2018年8月21日周二 上午5:13写道:
Hi Vino,

Unfortunately, I'm still stuck here. By moving the avro dependency chain to lib (and removing it from user jar), my OCFs decode but I get the error described here:


However, the Flink fix described in the PR above was to move the Avro dependency to the user jar. However, since I'm using YARN, I'm required to have flink-shaded-hadoop2-uber.jar loaded from lib -- and that has avro bundled un-shaded. So I'm back to the start problem...

Any advice is welcome!

-Cliff


On Mon, Aug 20, 2018 at 1:42 PM Cliff Resnick <[hidden email]> wrote:
Hi Vino,

You were right in your assumption -- unshaded avro was being added to our application jar via third-party dependency. Excluding it in packaging fixed the issue. For the record, it looks flink-avro must be loaded from the lib or there will be errors in checkpoint restores.

On Mon, Aug 20, 2018 at 8:43 AM Cliff Resnick <[hidden email]> wrote:
Hi Vino,

Thanks for the explanation, but the job only ever uses the Avro (1.8.2) pulled in by flink-formats/avro, so it's not a class version conflict there. 

I'm using default child-first loading. It might be a further transitive dependency, though it's not clear by stack trace or stepping through the process. When I get a chance I'll look further into it but in case anyone is experiencing similar problems, what is clear is that classloader order does matter with Avro.

On Sun, Aug 19, 2018, 11:36 PM vino yang <[hidden email]> wrote:
Hi Cliff,

My personal guess is that this may be caused by Job's Avro conflict with the Avro that the Flink framework itself relies on. 
Flink has provided some configuration parameters which allows you to determine the order of the classloaders yourself. [1]
Alternatively, you can debug classloading and participate in the documentation.[2]


Thanks, vino.

Cliff Resnick <[hidden email]> 于2018年8月20日周一 上午10:40写道:
Our Flink/YARN pipeline has been reading Avro from Kafka for a while now. We just introduced a source of Avro OCF (Object Container Files) read from S3. The Kafka Avro continued to decode without incident, but the OCF files failed 100% with anomalous parse errors in the decoding phase after the schema and codec were successfully read from them. The pipeline would work on my laptop, and when I submitted a test Main program to the Flink Session in YARN, that would also successfully decode. Only the actual pipeline run from the TaskManager failed. At one point I even remote debugged the TaskManager process and stepped through what looked like a normal Avro decode (if you can describe Avro code as normal!) -- until it abruptly failed with an int decode or what-have-you.

This stumped me for a while, but I finally tried moving flink-avro.jar from the lib to the application jar, and that fixed it. I'm not sure why this is, especially since there were no typical classloader-type errors.  This issue was observed both on Flink 1.5 and 1.6 in Flip-6 mode.

-Cliff