Hi, I am trying to analyze and update a MongoDB collection with Apache Flink 0.9.0 and Mongo Hadoop 1.4.0 Hadoop 2.6.0. java.lang.IllegalArgumentException: Can not create a Path from a null string at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123) at org.apache.hadoop.fs.Path.<init>(Path.java:135) at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.open(HadoopOutputFormatBase.java:108) at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:186) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) ==================================================== Configuration conf = new Configuration(); conf.set("mapred.output.dir", "/tmp/"); conf.set(MongoUtil.MONGO_INPUT_URI_PROPERTY, collectionsUri); conf.set(MongoUtil.MONGO_OUTPUT_URI_PROPERTY, collectionsUri); Job job = Job.getInstance(conf); // create a MongodbInputFormat, using a Hadoop input format wrapper InputFormat<Object, BSONObject> mapreduceInputFormat = new MyMongoInputFormat<Object, BSONObject>(); HadoopInputFormat<Object, BSONObject> hdIf = new HadoopInputFormat<Object, BSONObject>( mapreduceInputFormat, Object.class, BSONObject.class, job); DataSet<Tuple2<Text, BSONWritable>> fin = input .flatMap(new myFlatMapFunction()).setParallelism(16); MongoConfigUtil.setOutputURI(job.getConfiguration(), collectionsUri); fin.output(new HadoopOutputFormat<Text, BSONWritable>( new MongoOutputFormat<Text, BSONWritable>(), job)); // fin.writeAsText("/tmp/out", WriteMode.OVERWRITE); |
Debugging, it seem the commitTask method of the MongoOutputCommitter is never called. Is it possible that this 'bulk' approach of mongo-hadoop 1.4 does not fit the task execution method of Flink? any idea? thanks a lot in advance. Stefano Bortoli, PhD Email: [hidden email] Phone nr: +39 0461 1823912 Headquarters: Trento (Italy), Via Trener 8 2015-07-22 14:26 GMT+02:00 Stefano Bortoli <[hidden email]>:
|
In fact, on close() of the HadoopOutputFormat the fileOutputCommitter returns false on if (this.fileOutputCommitter.needsTaskCommit(this.context)) returns false. Also, both the close and the finalize global use a FileOutputCommitter, and never the MongoOutputCommitteri /** * commit the task by moving the output file out from the temporary directory. * @throws java.io.IOException */ @Override public void close() throws IOException { this.recordWriter.close(new HadoopDummyReporter()); if (this.fileOutputCommitter.needsTaskCommit(this.context)) { this.fileOutputCommitter.commitTask(this.context); } } @Override public void finalizeGlobal(int parallelism) throws IOException { try { JobContext jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID()); FileOutputCommitter fileOutputCommitter = new FileOutputCommitter(); // finalize HDFS output format fileOutputCommitter.commitJob(jobContext); } catch (Exception e) { throw new RuntimeException(e); } } anyone can have a look into that? saluti, Stefano 2015-07-22 15:53 GMT+02:00 Stefano Bortoli <[hidden email]>:
|
A simple solution would be to: 1 - create a constructor of HadoopOutputFormatBase and HadoopOutputFormat that gets the OutputCommitter as a parameter 2015-07-22 16:48 GMT+02:00 Stefano Bortoli <[hidden email]>:
|
In reply to this post by Stefano Bortoli
Thank's for reporting this, Stefano! Seems like the HadoopOutputFormat wrapper is pretty much specialized on File Output Formats. Can you open an issue for that? Someone will need to look into this... On Wed, Jul 22, 2015 at 4:48 PM, Stefano Bortoli <[hidden email]> wrote:
|
Meanwhile, I have implemented the MongoHadoopOutputFormat overriding open, close and globalFinalize methods. saluti,2015-07-22 17:11 GMT+02:00 Stephan Ewen <[hidden email]>:
|
Does this make the MongoHadoopOutputFormat work for you? On Thu, Jul 23, 2015 at 12:44 PM, Stefano Bortoli <[hidden email]> wrote:
|
Yes it does. :-) I have implemented it with Hadoop1 and Hadoop2. Essentially I have extended the HadoopOutputFormat reusing part of the code of the HadoopOutputFormatBase, and set the MongoOutputCommiter to replace the FileOutputCommitter. saluti,Stefano Bortoli, PhD Email: [hidden email] Phone nr: +39 0461 1823912 Headquarters: Trento (Italy), Via Trener 8 2015-07-23 13:31 GMT+02:00 Stephan Ewen <[hidden email]>:
|
Free forum by Nabble | Edit this page |