Hey all, I
am trying out a POC with flink on yarn. My simple goal is to read from a Hive
ORC table, process some data and write to a new Hive ORC table. Currently
I can get Flink to read the source table fine, both with using The
HCatalog Input format directly, and by using the flink-hcatalog
wrapper. Processing the data also works fine. Dumping to console or a text file also works fine. I'm now stuck trying to write the data out, I'm getting ClassNotFoundExceptions: Caused by: java.lang. at java.net.URLClassLoader. at java.lang.ClassLoader. at sun.misc.Launcher$ at java.lang.ClassLoader. at java.lang.Class.forName0( at java.lang.Class.forName(Class. at org.apache.hadoop.hive.common. at org.apache.hadoop.hive.common. at org.apache.hive.hcatalog. at org.apache.hive.hcatalog. Since
I read from an Orc table, I know I have that class in my classpath. So
I'm wondering if each stage/step in a flink process has some kind of
special classloader that I am not aware of? (also its odd that it wants the inputformat and not the outputformat, not sure why yet) My output code looks like this: Job job = Job.getInstance(conf); HCatOutputFormat.setOutput( HCatSchema outSchema = HCatOutputFormat. HCatOutputFormat.setSchema( HCatOutputFormat outputFormat = new HCatOutputFormat(); HadoopOutputFormat< // from previous processing step hcat.output(out); env.execute("run"); One other thing to note, I had to put flink-hadoop-compatability_2. Any help or explanation into how the classpath/classloading works would be wonderful! |
Hi Garrett, Flink distinguishes between two classloaders: 1) the system classloader which is the main classloader of the process. This classloader loads all jars in the ./lib folder and 2) the user classloader which loads the job jar. AFAIK, the different operators do not have distinct classloaders. So, in principle all operators should use the same user classloader. This looks suspicious because I would rather expect the OrcOutputFormat to be the problem than the input format. Can you post more of the stacktrace? This would help to identify the spot in the Flink code where the exception is thrown. Thanks, Fabian 2017-09-18 18:42 GMT+02:00 Garrett Barton <[hidden email]>:
|
Fabian, It looks like hive instantiates both input and output formats when doing either. I use hive 1.2.1, and you can see in HCatUtil.getStorageHandler where it tries to load both. It looks like its happening after the writes complete and flink is in the finish/finalize stage. When I watch the counters in the Flink ui, i see all output tasks mark finished along with bytes sent and records sent being exactly what I expect them to be. The first error also mentions the master, is this the flink jobmanager process then? The expanded stacktrace is: Caused by: java.lang.Exception: Failed to finalize execution on master at org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1325) at org.apache.flink.runtime.executiongraph.ExecutionVertex.executionFinished(ExecutionVertex.java:688) at org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:797) at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1477) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$4.apply$mcV$sp(JobManager.scala:710) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709) ... 8 more Caused by: java.lang.RuntimeException: java.io.IOException: Failed to load foster storage handler at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.finalizeGlobal(HadoopOutputFormatBase.java:202) at org.apache.flink.runtime.jobgraph.OutputFormatVertex.finalizeOnMaster(OutputFormatVertex.java:118) at org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1320) ... 14 more Caused by: java.io.IOException: Failed to load foster storage handler at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(HCatUtil.java:409) at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(HCatUtil.java:367) at org.apache.hive.hcatalog.mapreduce.HCatBaseOutputFormat.getOutputFormat(HCatBaseOutputFormat.java:77) at org.apache.hive.hcatalog.mapreduce.HCatOutputFormat.getOutputCommitter(HCatOutputFormat.java:275) at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.finalizeGlobal(HadoopOutputFormatBase.java:200) ... 16 more Caused by: java.lang.ClassNotFoundExcepti at java.net.URLClassLoader.findCl at java.lang.ClassLoader.loadClas at sun.misc.Launcher$AppClassLoad at java.lang.ClassLoader.loadClas at java.lang.Class.forName0(Nativ at java.lang.Class.forName(Class. at org.apache.hadoop.hive.common.at org.apache.hadoop.hive.common. at org.apache.hive.hcatalog.mapre at org.apache.hive.hcatalog.commo Thank you all for any help. :) On Tue, Sep 19, 2017 at 11:05 AM, Fabian Hueske <[hidden email]> wrote:
|
Hi Garrett, I think I identified the problem.You said you put the Hive/HCat dependencies into your user fat Jar, correct? In this case, they are loaded with Flink's userClassLoader (as described before). In the OutputFormatVertex.finalizeOnM However, when the HCatOutputFormat.getOutputComm This behavior is actually OK, because we usually set the context classloader to be the user classloader before calling user code. However, this has not been done here. So, this is in fact a bug. I created this JIRA issue: https://issues.apache.org/jira/browse/FLINK-7656 and will open a PR for that. Thanks for helping to diagnose the issue, Fabian 2017-09-19 22:05 GMT+02:00 Garrett Barton <[hidden email]>:
|
Best, Fabian 2017-09-20 16:15 GMT+02:00 Fabian Hueske <[hidden email]>:
|
Fabian, Awesome! After your initial email I got things to work by deploying my fat jar into the flink/lib folder, and volia! it worked. :) I will grab your pull request and give it a go tomorrow. On Wed, Sep 20, 2017 at 1:03 PM, Fabian Hueske <[hidden email]> wrote:
|
Fabian, Just to follow up on this, I took the patch, compiled that class and stuck it into the existing 1.3.2 jar and all is well. (I couldn't get all of flink to build correctly) Thank you! On Wed, Sep 20, 2017 at 3:53 PM, Garrett Barton <[hidden email]> wrote:
|
Thanks for the feedback Garrett! Good to know that this fixes the problem. The patch will be included in the next releases. Best, Fabian 2017-10-06 20:31 GMT+02:00 Garrett Barton <[hidden email]>:
|
Free forum by Nabble | Edit this page |