Our Flink/YARN pipeline has been reading Avro from Kafka for a while now. We just introduced a source of Avro OCF (Object Container Files) read from S3. The Kafka Avro continued to decode without incident, but the OCF files failed 100% with anomalous parse errors in the decoding phase after the schema and codec were successfully read from them. The pipeline would work on my laptop, and when I submitted a test Main program to the Flink Session in YARN, that would also successfully decode. Only the actual pipeline run from the TaskManager failed. At one point I even remote debugged the TaskManager process and stepped through what looked like a normal Avro decode (if you can describe Avro code as normal!) -- until it abruptly failed with an int decode or what-have-you. This stumped me for a while, but I finally tried moving flink-avro.jar from the lib to the application jar, and that fixed it. I'm not sure why this is, especially since there were no typical classloader-type errors. This issue was observed both on Flink 1.5 and 1.6 in Flip-6 mode. -Cliff |
Hi Cliff, My personal guess is that this may be caused by Job's Avro conflict with the Avro that the Flink framework itself relies on. Flink has provided some configuration parameters which allows you to determine the order of the classloaders yourself. [1] Alternatively, you can debug classloading and participate in the documentation.[2] Thanks, vino. Cliff Resnick <[hidden email]> 于2018年8月20日周一 上午10:40写道:
|
Hi Vino,
Thanks for the explanation, but the job only ever uses the Avro (1.8.2) pulled in by flink-formats/avro, so it's not a class version conflict there. I'm using default child-first loading. It might be a further transitive dependency, though it's not clear by stack trace or stepping through the process. When I get a chance I'll look further into it but in case anyone is experiencing similar problems, what is clear is that classloader order does matter with Avro. On Sun, Aug 19, 2018, 11:36 PM vino yang <[hidden email]> wrote:
|
Hi Vino, You were right in your assumption -- unshaded avro was being added to our application jar via third-party dependency. Excluding it in packaging fixed the issue. For the record, it looks flink-avro must be loaded from the lib or there will be errors in checkpoint restores. On Mon, Aug 20, 2018 at 8:43 AM Cliff Resnick <[hidden email]> wrote:
|
Hey Cliff, can you provide the stack trace of the issue you were seeing? We recently ran into a similar issue that we're still debugging. Did it look like this: java.lang.IllegalStateException: Could not initialize operator state backend. -Jamie On Mon, Aug 20, 2018 at 10:42 AM, Cliff Resnick <[hidden email]> wrote:
|
Hi Jamie, No, it was nothing of the class not found variety, just parse errors. It had to do with Avro getting mixed up with different versions. -Cliff On Mon, Aug 20, 2018 at 4:18 PM Jamie Grier <[hidden email]> wrote:
|
In reply to this post by Cliff Resnick
Hi Vino, Unfortunately, I'm still stuck here. By moving the avro dependency chain to lib (and removing it from user jar), my OCFs decode but I get the error described here: However, the Flink fix described in the PR above was to move the Avro dependency to the user jar. However, since I'm using YARN, I'm required to have flink-shaded-hadoop2-uber.jar loaded from lib -- and that has avro bundled un-shaded. So I'm back to the start problem... Any advice is welcome! -Cliff On Mon, Aug 20, 2018 at 1:42 PM Cliff Resnick <[hidden email]> wrote:
|
Hi Cliff, If so, you can explicitly exclude Avro's dependencies from related dependencies (using <exclude>) and then directly introduce dependencies on the Avro version you need. Thanks, vino. Cliff Resnick <[hidden email]> 于2018年8月21日周二 上午5:13写道:
|
Hi Cliff,
Do you actually need the flink-shaded-hadoop2-uber.jar in lib. If you're running on YARN, you should be able to just remove them because with YARN you will have Hadoop in the classpath anyways. Aljoscha
|
Hi Aljoscha, We need flink-shaded-hadoop2-uber.jar because there is no hadoop distro on the instance the Flink session/jobs is managed from and the process that launches Flink is not a java process, but execs a process that calls the flink script. -Cliff On Tue, Aug 21, 2018 at 5:11 AM Aljoscha Krettek <[hidden email]> wrote:
|
Solved this by moving flink-avro to lib and reverting to `classloader.resolve-order: parent-first`. I still don't know why, but I guess if you're reading Avro both from file and Kafka in the same pipeline then inverted class loader delegation will not work. Thanks, Vino for your help! On Tue, Aug 21, 2018 at 8:02 AM Cliff Resnick <[hidden email]> wrote:
|
Hi Cliff, You are welcome, I am very happy to hear this message. Thanks, vino. Cliff Resnick <[hidden email]> 于2018年8月21日周二 下午11:46写道:
|
Free forum by Nabble | Edit this page |