MongoOutputFormat does not write back to collection

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

MongoOutputFormat does not write back to collection

Stefano Bortoli
Hi,

I am trying to analyze and update a MongoDB collection with Apache Flink 0.9.0 and Mongo Hadoop 1.4.0 Hadoop 2.6.0.

The process is fairly simple, and the MongoInputFormat works smoothly, however it does not write back to the collection. The process works, because the writeAsText works as expected. I am quite puzzled because debugging I can see it writes in some temporary directory.

The mapred.output.uri seem to serve just to output a file named  _SUCCESS, and if I do not set it fails with
java.lang.IllegalArgumentException: Can not create a Path from a null string
    at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
    at org.apache.hadoop.fs.Path.<init>(Path.java:135)
    at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.open(HadoopOutputFormatBase.java:108)
    at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:186)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)

Anyone experienced something similar? any hints where to look at? Thanks a lot in advance!

saluti,
Stefano

====================================================
Configuration conf = new Configuration();
conf.set("mapred.output.dir", "/tmp/");
        conf.set(MongoUtil.MONGO_INPUT_URI_PROPERTY,
                collectionsUri);
        conf.set(MongoUtil.MONGO_OUTPUT_URI_PROPERTY,
                collectionsUri);
            
        Job job = Job.getInstance(conf);
        
        // create a MongodbInputFormat, using a Hadoop input format wrapper
        InputFormat<Object, BSONObject>  mapreduceInputFormat =  new MyMongoInputFormat<Object, BSONObject>();
        HadoopInputFormat<Object, BSONObject> hdIf = new HadoopInputFormat<Object, BSONObject>(
                mapreduceInputFormat, Object.class, BSONObject.class,
                job);
DataSet<Tuple2<Text, BSONWritable>> fin = input
                .flatMap(new myFlatMapFunction()).setParallelism(16);

        MongoConfigUtil.setOutputURI(job.getConfiguration(), collectionsUri);

        fin.output(new HadoopOutputFormat<Text, BSONWritable>(
                new MongoOutputFormat<Text, BSONWritable>(),
                job));
//        fin.writeAsText("/tmp/out", WriteMode.OVERWRITE);

Reply | Threaded
Open this post in threaded view
|

Re: MongoOutputFormat does not write back to collection

Stefano Bortoli-2
Debugging, it seem the commitTask method of the MongoOutputCommitter is never called. Is it possible that this 'bulk' approach of mongo-hadoop 1.4 does not fit the task execution method of Flink?

any idea? thanks a lot in advance.

saluti,
Stefano

Stefano Bortoli, PhD
ENS Technical Director
_______________________________________________
OKKAMSrl www.okkam.it

Email: [hidden email]

Phone nr: +39 0461 1823912

Headquarters: Trento (Italy), Via Trener 8
Registered office: Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally privileged and/or confidential information. Please do not read it if you are not the intended recipient(S). Any use, distribution, reproduction or disclosure by any other person is strictly prohibited. If you have received this e-mail in error, please notify the sender and destroy the original transmission and its attachments without reading or saving it in any manner.


2015-07-22 14:26 GMT+02:00 Stefano Bortoli <[hidden email]>:
Hi,

I am trying to analyze and update a MongoDB collection with Apache Flink 0.9.0 and Mongo Hadoop 1.4.0 Hadoop 2.6.0.

The process is fairly simple, and the MongoInputFormat works smoothly, however it does not write back to the collection. The process works, because the writeAsText works as expected. I am quite puzzled because debugging I can see it writes in some temporary directory.

The mapred.output.uri seem to serve just to output a file named  _SUCCESS, and if I do not set it fails with
java.lang.IllegalArgumentException: Can not create a Path from a null string
    at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
    at org.apache.hadoop.fs.Path.<init>(Path.java:135)
    at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.open(HadoopOutputFormatBase.java:108)
    at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:186)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)

Anyone experienced something similar? any hints where to look at? Thanks a lot in advance!

saluti,
Stefano

====================================================
Configuration conf = new Configuration();
conf.set("mapred.output.dir", "/tmp/");
        conf.set(MongoUtil.MONGO_INPUT_URI_PROPERTY,
                collectionsUri);
        conf.set(MongoUtil.MONGO_OUTPUT_URI_PROPERTY,
                collectionsUri);
            
        Job job = Job.getInstance(conf);
        
        // create a MongodbInputFormat, using a Hadoop input format wrapper
        InputFormat<Object, BSONObject>  mapreduceInputFormat =  new MyMongoInputFormat<Object, BSONObject>();
        HadoopInputFormat<Object, BSONObject> hdIf = new HadoopInputFormat<Object, BSONObject>(
                mapreduceInputFormat, Object.class, BSONObject.class,
                job);
DataSet<Tuple2<Text, BSONWritable>> fin = input
                .flatMap(new myFlatMapFunction()).setParallelism(16);

        MongoConfigUtil.setOutputURI(job.getConfiguration(), collectionsUri);

        fin.output(new HadoopOutputFormat<Text, BSONWritable>(
                new MongoOutputFormat<Text, BSONWritable>(),
                job));
//        fin.writeAsText("/tmp/out", WriteMode.OVERWRITE);


Reply | Threaded
Open this post in threaded view
|

Re: MongoOutputFormat does not write back to collection

Stefano Bortoli
In fact, on close() of the HadoopOutputFormat the fileOutputCommitter returns false on if (this.fileOutputCommitter.needsTaskCommit(this.context)) returns false.

i    /**
     * commit the task by moving the output file out from the temporary directory.
     * @throws java.io.IOException
     */
    @Override
    public void close() throws IOException {
        this.recordWriter.close(new HadoopDummyReporter());
       
        if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
            this.fileOutputCommitter.commitTask(this.context);
        }
    }


Also, both the close and the finalize global use a FileOutputCommitter, and never the MongoOutputCommitter

@Override
    public void finalizeGlobal(int parallelism) throws IOException {

        try {
            JobContext jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
            FileOutputCommitter fileOutputCommitter = new FileOutputCommitter();
           
            // finalize HDFS output format
            fileOutputCommitter.commitJob(jobContext);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

anyone can have a look into that?

saluti,
Stefano

2015-07-22 15:53 GMT+02:00 Stefano Bortoli <[hidden email]>:
Debugging, it seem the commitTask method of the MongoOutputCommitter is never called. Is it possible that this 'bulk' approach of mongo-hadoop 1.4 does not fit the task execution method of Flink?

any idea? thanks a lot in advance.

saluti,
Stefano

Stefano Bortoli, PhD
ENS Technical Director
_______________________________________________
OKKAMSrl www.okkam.it

Email: [hidden email]

Phone nr: <a href="tel:%2B39%200461%201823912" value="+3904611823912" target="_blank">+39 0461 1823912

Headquarters: Trento (Italy), Via Trener 8
Registered office: Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally privileged and/or confidential information. Please do not read it if you are not the intended recipient(S). Any use, distribution, reproduction or disclosure by any other person is strictly prohibited. If you have received this e-mail in error, please notify the sender and destroy the original transmission and its attachments without reading or saving it in any manner.


2015-07-22 14:26 GMT+02:00 Stefano Bortoli <[hidden email]>:
Hi,

I am trying to analyze and update a MongoDB collection with Apache Flink 0.9.0 and Mongo Hadoop 1.4.0 Hadoop 2.6.0.

The process is fairly simple, and the MongoInputFormat works smoothly, however it does not write back to the collection. The process works, because the writeAsText works as expected. I am quite puzzled because debugging I can see it writes in some temporary directory.

The mapred.output.uri seem to serve just to output a file named  _SUCCESS, and if I do not set it fails with
java.lang.IllegalArgumentException: Can not create a Path from a null string
    at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
    at org.apache.hadoop.fs.Path.<init>(Path.java:135)
    at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.open(HadoopOutputFormatBase.java:108)
    at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:186)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)

Anyone experienced something similar? any hints where to look at? Thanks a lot in advance!

saluti,
Stefano

====================================================
Configuration conf = new Configuration();
conf.set("mapred.output.dir", "/tmp/");
        conf.set(MongoUtil.MONGO_INPUT_URI_PROPERTY,
                collectionsUri);
        conf.set(MongoUtil.MONGO_OUTPUT_URI_PROPERTY,
                collectionsUri);
            
        Job job = Job.getInstance(conf);
        
        // create a MongodbInputFormat, using a Hadoop input format wrapper
        InputFormat<Object, BSONObject>  mapreduceInputFormat =  new MyMongoInputFormat<Object, BSONObject>();
        HadoopInputFormat<Object, BSONObject> hdIf = new HadoopInputFormat<Object, BSONObject>(
                mapreduceInputFormat, Object.class, BSONObject.class,
                job);
DataSet<Tuple2<Text, BSONWritable>> fin = input
                .flatMap(new myFlatMapFunction()).setParallelism(16);

        MongoConfigUtil.setOutputURI(job.getConfiguration(), collectionsUri);

        fin.output(new HadoopOutputFormat<Text, BSONWritable>(
                new MongoOutputFormat<Text, BSONWritable>(),
                job));
//        fin.writeAsText("/tmp/out", WriteMode.OVERWRITE);



Reply | Threaded
Open this post in threaded view
|

Re: MongoOutputFormat does not write back to collection

Stefano Bortoli
A simple solution would be to:

1 - create a constructor of HadoopOutputFormatBase and HadoopOutputFormat that gets the OutputCommitter as a parameter
2 - change the outputCommitter field of HadoopOutputFormatBase to be a generic OutputCommitter
3 - remove the default assignment in the open() and finalizeGlobal to the outputCommitter to FileOutputCommitter(), or keep it as a default in case of no specific assignment.

saluti,
Stefano

2015-07-22 16:48 GMT+02:00 Stefano Bortoli <[hidden email]>:
In fact, on close() of the HadoopOutputFormat the fileOutputCommitter returns false on if (this.fileOutputCommitter.needsTaskCommit(this.context)) returns false.

i    /**
     * commit the task by moving the output file out from the temporary directory.
     * @throws java.io.IOException
     */
    @Override
    public void close() throws IOException {
        this.recordWriter.close(new HadoopDummyReporter());
       
        if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
            this.fileOutputCommitter.commitTask(this.context);
        }
    }


Also, both the close and the finalize global use a FileOutputCommitter, and never the MongoOutputCommitter

@Override
    public void finalizeGlobal(int parallelism) throws IOException {

        try {
            JobContext jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
            FileOutputCommitter fileOutputCommitter = new FileOutputCommitter();
           
            // finalize HDFS output format
            fileOutputCommitter.commitJob(jobContext);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

anyone can have a look into that?

saluti,
Stefano

2015-07-22 15:53 GMT+02:00 Stefano Bortoli <[hidden email]>:
Debugging, it seem the commitTask method of the MongoOutputCommitter is never called. Is it possible that this 'bulk' approach of mongo-hadoop 1.4 does not fit the task execution method of Flink?

any idea? thanks a lot in advance.

saluti,
Stefano

Stefano Bortoli, PhD
ENS Technical Director
_______________________________________________
OKKAMSrl www.okkam.it

Email: [hidden email]

Phone nr: <a href="tel:%2B39%200461%201823912" value="+3904611823912" target="_blank">+39 0461 1823912

Headquarters: Trento (Italy), Via Trener 8
Registered office: Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally privileged and/or confidential information. Please do not read it if you are not the intended recipient(S). Any use, distribution, reproduction or disclosure by any other person is strictly prohibited. If you have received this e-mail in error, please notify the sender and destroy the original transmission and its attachments without reading or saving it in any manner.


2015-07-22 14:26 GMT+02:00 Stefano Bortoli <[hidden email]>:
Hi,

I am trying to analyze and update a MongoDB collection with Apache Flink 0.9.0 and Mongo Hadoop 1.4.0 Hadoop 2.6.0.

The process is fairly simple, and the MongoInputFormat works smoothly, however it does not write back to the collection. The process works, because the writeAsText works as expected. I am quite puzzled because debugging I can see it writes in some temporary directory.

The mapred.output.uri seem to serve just to output a file named  _SUCCESS, and if I do not set it fails with
java.lang.IllegalArgumentException: Can not create a Path from a null string
    at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
    at org.apache.hadoop.fs.Path.<init>(Path.java:135)
    at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.open(HadoopOutputFormatBase.java:108)
    at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:186)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)

Anyone experienced something similar? any hints where to look at? Thanks a lot in advance!

saluti,
Stefano

====================================================
Configuration conf = new Configuration();
conf.set("mapred.output.dir", "/tmp/");
        conf.set(MongoUtil.MONGO_INPUT_URI_PROPERTY,
                collectionsUri);
        conf.set(MongoUtil.MONGO_OUTPUT_URI_PROPERTY,
                collectionsUri);
            
        Job job = Job.getInstance(conf);
        
        // create a MongodbInputFormat, using a Hadoop input format wrapper
        InputFormat<Object, BSONObject>  mapreduceInputFormat =  new MyMongoInputFormat<Object, BSONObject>();
        HadoopInputFormat<Object, BSONObject> hdIf = new HadoopInputFormat<Object, BSONObject>(
                mapreduceInputFormat, Object.class, BSONObject.class,
                job);
DataSet<Tuple2<Text, BSONWritable>> fin = input
                .flatMap(new myFlatMapFunction()).setParallelism(16);

        MongoConfigUtil.setOutputURI(job.getConfiguration(), collectionsUri);

        fin.output(new HadoopOutputFormat<Text, BSONWritable>(
                new MongoOutputFormat<Text, BSONWritable>(),
                job));
//        fin.writeAsText("/tmp/out", WriteMode.OVERWRITE);




Reply | Threaded
Open this post in threaded view
|

Re: MongoOutputFormat does not write back to collection

Stephan Ewen
In reply to this post by Stefano Bortoli
Thank's for reporting this, Stefano!

Seems like the HadoopOutputFormat wrapper is pretty much specialized on File Output Formats.

Can you open an issue for that? Someone will need to look into this...

On Wed, Jul 22, 2015 at 4:48 PM, Stefano Bortoli <[hidden email]> wrote:
In fact, on close() of the HadoopOutputFormat the fileOutputCommitter returns false on if (this.fileOutputCommitter.needsTaskCommit(this.context)) returns false.

i    /**
     * commit the task by moving the output file out from the temporary directory.
     * @throws java.io.IOException
     */
    @Override
    public void close() throws IOException {
        this.recordWriter.close(new HadoopDummyReporter());
       
        if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
            this.fileOutputCommitter.commitTask(this.context);
        }
    }


Also, both the close and the finalize global use a FileOutputCommitter, and never the MongoOutputCommitter

@Override
    public void finalizeGlobal(int parallelism) throws IOException {

        try {
            JobContext jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
            FileOutputCommitter fileOutputCommitter = new FileOutputCommitter();
           
            // finalize HDFS output format
            fileOutputCommitter.commitJob(jobContext);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

anyone can have a look into that?

saluti,
Stefano

2015-07-22 15:53 GMT+02:00 Stefano Bortoli <[hidden email]>:
Debugging, it seem the commitTask method of the MongoOutputCommitter is never called. Is it possible that this 'bulk' approach of mongo-hadoop 1.4 does not fit the task execution method of Flink?

any idea? thanks a lot in advance.

saluti,
Stefano

Stefano Bortoli, PhD
ENS Technical Director
_______________________________________________
OKKAMSrl www.okkam.it

Email: [hidden email]

Phone nr: <a href="tel:%2B39%200461%201823912" value="+3904611823912" target="_blank">+39 0461 1823912

Headquarters: Trento (Italy), Via Trener 8
Registered office: Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally privileged and/or confidential information. Please do not read it if you are not the intended recipient(S). Any use, distribution, reproduction or disclosure by any other person is strictly prohibited. If you have received this e-mail in error, please notify the sender and destroy the original transmission and its attachments without reading or saving it in any manner.


2015-07-22 14:26 GMT+02:00 Stefano Bortoli <[hidden email]>:
Hi,

I am trying to analyze and update a MongoDB collection with Apache Flink 0.9.0 and Mongo Hadoop 1.4.0 Hadoop 2.6.0.

The process is fairly simple, and the MongoInputFormat works smoothly, however it does not write back to the collection. The process works, because the writeAsText works as expected. I am quite puzzled because debugging I can see it writes in some temporary directory.

The mapred.output.uri seem to serve just to output a file named  _SUCCESS, and if I do not set it fails with
java.lang.IllegalArgumentException: Can not create a Path from a null string
    at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
    at org.apache.hadoop.fs.Path.<init>(Path.java:135)
    at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.open(HadoopOutputFormatBase.java:108)
    at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:186)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)

Anyone experienced something similar? any hints where to look at? Thanks a lot in advance!

saluti,
Stefano

====================================================
Configuration conf = new Configuration();
conf.set("mapred.output.dir", "/tmp/");
        conf.set(MongoUtil.MONGO_INPUT_URI_PROPERTY,
                collectionsUri);
        conf.set(MongoUtil.MONGO_OUTPUT_URI_PROPERTY,
                collectionsUri);
            
        Job job = Job.getInstance(conf);
        
        // create a MongodbInputFormat, using a Hadoop input format wrapper
        InputFormat<Object, BSONObject>  mapreduceInputFormat =  new MyMongoInputFormat<Object, BSONObject>();
        HadoopInputFormat<Object, BSONObject> hdIf = new HadoopInputFormat<Object, BSONObject>(
                mapreduceInputFormat, Object.class, BSONObject.class,
                job);
DataSet<Tuple2<Text, BSONWritable>> fin = input
                .flatMap(new myFlatMapFunction()).setParallelism(16);

        MongoConfigUtil.setOutputURI(job.getConfiguration(), collectionsUri);

        fin.output(new HadoopOutputFormat<Text, BSONWritable>(
                new MongoOutputFormat<Text, BSONWritable>(),
                job));
//        fin.writeAsText("/tmp/out", WriteMode.OVERWRITE);




Reply | Threaded
Open this post in threaded view
|

Re: MongoOutputFormat does not write back to collection

Stefano Bortoli
Meanwhile, I have implemented the MongoHadoopOutputFormat overriding open, close and globalFinalize methods.

saluti,
Stefano

2015-07-22 17:11 GMT+02:00 Stephan Ewen <[hidden email]>:
Thank's for reporting this, Stefano!

Seems like the HadoopOutputFormat wrapper is pretty much specialized on File Output Formats.

Can you open an issue for that? Someone will need to look into this...

On Wed, Jul 22, 2015 at 4:48 PM, Stefano Bortoli <[hidden email]> wrote:
In fact, on close() of the HadoopOutputFormat the fileOutputCommitter returns false on if (this.fileOutputCommitter.needsTaskCommit(this.context)) returns false.

i    /**
     * commit the task by moving the output file out from the temporary directory.
     * @throws java.io.IOException
     */
    @Override
    public void close() throws IOException {
        this.recordWriter.close(new HadoopDummyReporter());
       
        if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
            this.fileOutputCommitter.commitTask(this.context);
        }
    }


Also, both the close and the finalize global use a FileOutputCommitter, and never the MongoOutputCommitter

@Override
    public void finalizeGlobal(int parallelism) throws IOException {

        try {
            JobContext jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
            FileOutputCommitter fileOutputCommitter = new FileOutputCommitter();
           
            // finalize HDFS output format
            fileOutputCommitter.commitJob(jobContext);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

anyone can have a look into that?

saluti,
Stefano

2015-07-22 15:53 GMT+02:00 Stefano Bortoli <[hidden email]>:
Debugging, it seem the commitTask method of the MongoOutputCommitter is never called. Is it possible that this 'bulk' approach of mongo-hadoop 1.4 does not fit the task execution method of Flink?

any idea? thanks a lot in advance.

saluti,
Stefano

Stefano Bortoli, PhD
ENS Technical Director
_______________________________________________
OKKAMSrl www.okkam.it

Email: [hidden email]

Phone nr: <a href="tel:%2B39%200461%201823912" value="+3904611823912" target="_blank">+39 0461 1823912

Headquarters: Trento (Italy), Via Trener 8
Registered office: Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally privileged and/or confidential information. Please do not read it if you are not the intended recipient(S). Any use, distribution, reproduction or disclosure by any other person is strictly prohibited. If you have received this e-mail in error, please notify the sender and destroy the original transmission and its attachments without reading or saving it in any manner.


2015-07-22 14:26 GMT+02:00 Stefano Bortoli <[hidden email]>:
Hi,

I am trying to analyze and update a MongoDB collection with Apache Flink 0.9.0 and Mongo Hadoop 1.4.0 Hadoop 2.6.0.

The process is fairly simple, and the MongoInputFormat works smoothly, however it does not write back to the collection. The process works, because the writeAsText works as expected. I am quite puzzled because debugging I can see it writes in some temporary directory.

The mapred.output.uri seem to serve just to output a file named  _SUCCESS, and if I do not set it fails with
java.lang.IllegalArgumentException: Can not create a Path from a null string
    at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
    at org.apache.hadoop.fs.Path.<init>(Path.java:135)
    at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.open(HadoopOutputFormatBase.java:108)
    at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:186)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)

Anyone experienced something similar? any hints where to look at? Thanks a lot in advance!

saluti,
Stefano

====================================================
Configuration conf = new Configuration();
conf.set("mapred.output.dir", "/tmp/");
        conf.set(MongoUtil.MONGO_INPUT_URI_PROPERTY,
                collectionsUri);
        conf.set(MongoUtil.MONGO_OUTPUT_URI_PROPERTY,
                collectionsUri);
            
        Job job = Job.getInstance(conf);
        
        // create a MongodbInputFormat, using a Hadoop input format wrapper
        InputFormat<Object, BSONObject>  mapreduceInputFormat =  new MyMongoInputFormat<Object, BSONObject>();
        HadoopInputFormat<Object, BSONObject> hdIf = new HadoopInputFormat<Object, BSONObject>(
                mapreduceInputFormat, Object.class, BSONObject.class,
                job);
DataSet<Tuple2<Text, BSONWritable>> fin = input
                .flatMap(new myFlatMapFunction()).setParallelism(16);

        MongoConfigUtil.setOutputURI(job.getConfiguration(), collectionsUri);

        fin.output(new HadoopOutputFormat<Text, BSONWritable>(
                new MongoOutputFormat<Text, BSONWritable>(),
                job));
//        fin.writeAsText("/tmp/out", WriteMode.OVERWRITE);





Reply | Threaded
Open this post in threaded view
|

Re: MongoOutputFormat does not write back to collection

Stephan Ewen
Does this make the MongoHadoopOutputFormat work for you?

On Thu, Jul 23, 2015 at 12:44 PM, Stefano Bortoli <[hidden email]> wrote:
Meanwhile, I have implemented the MongoHadoopOutputFormat overriding open, close and globalFinalize methods.

saluti,
Stefano

2015-07-22 17:11 GMT+02:00 Stephan Ewen <[hidden email]>:
Thank's for reporting this, Stefano!

Seems like the HadoopOutputFormat wrapper is pretty much specialized on File Output Formats.

Can you open an issue for that? Someone will need to look into this...

On Wed, Jul 22, 2015 at 4:48 PM, Stefano Bortoli <[hidden email]> wrote:
In fact, on close() of the HadoopOutputFormat the fileOutputCommitter returns false on if (this.fileOutputCommitter.needsTaskCommit(this.context)) returns false.

i    /**
     * commit the task by moving the output file out from the temporary directory.
     * @throws java.io.IOException
     */
    @Override
    public void close() throws IOException {
        this.recordWriter.close(new HadoopDummyReporter());
       
        if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
            this.fileOutputCommitter.commitTask(this.context);
        }
    }


Also, both the close and the finalize global use a FileOutputCommitter, and never the MongoOutputCommitter

@Override
    public void finalizeGlobal(int parallelism) throws IOException {

        try {
            JobContext jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
            FileOutputCommitter fileOutputCommitter = new FileOutputCommitter();
           
            // finalize HDFS output format
            fileOutputCommitter.commitJob(jobContext);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

anyone can have a look into that?

saluti,
Stefano

2015-07-22 15:53 GMT+02:00 Stefano Bortoli <[hidden email]>:
Debugging, it seem the commitTask method of the MongoOutputCommitter is never called. Is it possible that this 'bulk' approach of mongo-hadoop 1.4 does not fit the task execution method of Flink?

any idea? thanks a lot in advance.

saluti,
Stefano

Stefano Bortoli, PhD
ENS Technical Director
_______________________________________________
OKKAMSrl www.okkam.it

Email: [hidden email]

Phone nr: <a href="tel:%2B39%200461%201823912" value="+3904611823912" target="_blank">+39 0461 1823912

Headquarters: Trento (Italy), Via Trener 8
Registered office: Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally privileged and/or confidential information. Please do not read it if you are not the intended recipient(S). Any use, distribution, reproduction or disclosure by any other person is strictly prohibited. If you have received this e-mail in error, please notify the sender and destroy the original transmission and its attachments without reading or saving it in any manner.


2015-07-22 14:26 GMT+02:00 Stefano Bortoli <[hidden email]>:
Hi,

I am trying to analyze and update a MongoDB collection with Apache Flink 0.9.0 and Mongo Hadoop 1.4.0 Hadoop 2.6.0.

The process is fairly simple, and the MongoInputFormat works smoothly, however it does not write back to the collection. The process works, because the writeAsText works as expected. I am quite puzzled because debugging I can see it writes in some temporary directory.

The mapred.output.uri seem to serve just to output a file named  _SUCCESS, and if I do not set it fails with
java.lang.IllegalArgumentException: Can not create a Path from a null string
    at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
    at org.apache.hadoop.fs.Path.<init>(Path.java:135)
    at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.open(HadoopOutputFormatBase.java:108)
    at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:186)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)

Anyone experienced something similar? any hints where to look at? Thanks a lot in advance!

saluti,
Stefano

====================================================
Configuration conf = new Configuration();
conf.set("mapred.output.dir", "/tmp/");
        conf.set(MongoUtil.MONGO_INPUT_URI_PROPERTY,
                collectionsUri);
        conf.set(MongoUtil.MONGO_OUTPUT_URI_PROPERTY,
                collectionsUri);
            
        Job job = Job.getInstance(conf);
        
        // create a MongodbInputFormat, using a Hadoop input format wrapper
        InputFormat<Object, BSONObject>  mapreduceInputFormat =  new MyMongoInputFormat<Object, BSONObject>();
        HadoopInputFormat<Object, BSONObject> hdIf = new HadoopInputFormat<Object, BSONObject>(
                mapreduceInputFormat, Object.class, BSONObject.class,
                job);
DataSet<Tuple2<Text, BSONWritable>> fin = input
                .flatMap(new myFlatMapFunction()).setParallelism(16);

        MongoConfigUtil.setOutputURI(job.getConfiguration(), collectionsUri);

        fin.output(new HadoopOutputFormat<Text, BSONWritable>(
                new MongoOutputFormat<Text, BSONWritable>(),
                job));
//        fin.writeAsText("/tmp/out", WriteMode.OVERWRITE);






Reply | Threaded
Open this post in threaded view
|

Re: MongoOutputFormat does not write back to collection

Stefano Bortoli-2
Yes it does. :-) I have implemented it with Hadoop1 and Hadoop2. Essentially I have extended the HadoopOutputFormat reusing part of the code of the HadoopOutputFormatBase, and set the MongoOutputCommiter to replace the FileOutputCommitter.

saluti,
Stefano



Stefano Bortoli, PhD
ENS Technical Director
_______________________________________________
OKKAMSrl www.okkam.it

Email: [hidden email]

Phone nr: +39 0461 1823912

Headquarters: Trento (Italy), Via Trener 8
Registered office: Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally privileged and/or confidential information. Please do not read it if you are not the intended recipient(S). Any use, distribution, reproduction or disclosure by any other person is strictly prohibited. If you have received this e-mail in error, please notify the sender and destroy the original transmission and its attachments without reading or saving it in any manner.


2015-07-23 13:31 GMT+02:00 Stephan Ewen <[hidden email]>:
Does this make the MongoHadoopOutputFormat work for you?

On Thu, Jul 23, 2015 at 12:44 PM, Stefano Bortoli <[hidden email]> wrote:
Meanwhile, I have implemented the MongoHadoopOutputFormat overriding open, close and globalFinalize methods.

saluti,
Stefano

2015-07-22 17:11 GMT+02:00 Stephan Ewen <[hidden email]>:
Thank's for reporting this, Stefano!

Seems like the HadoopOutputFormat wrapper is pretty much specialized on File Output Formats.

Can you open an issue for that? Someone will need to look into this...

On Wed, Jul 22, 2015 at 4:48 PM, Stefano Bortoli <[hidden email]> wrote:
In fact, on close() of the HadoopOutputFormat the fileOutputCommitter returns false on if (this.fileOutputCommitter.needsTaskCommit(this.context)) returns false.

i    /**
     * commit the task by moving the output file out from the temporary directory.
     * @throws java.io.IOException
     */
    @Override
    public void close() throws IOException {
        this.recordWriter.close(new HadoopDummyReporter());
       
        if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
            this.fileOutputCommitter.commitTask(this.context);
        }
    }


Also, both the close and the finalize global use a FileOutputCommitter, and never the MongoOutputCommitter

@Override
    public void finalizeGlobal(int parallelism) throws IOException {

        try {
            JobContext jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
            FileOutputCommitter fileOutputCommitter = new FileOutputCommitter();
           
            // finalize HDFS output format
            fileOutputCommitter.commitJob(jobContext);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

anyone can have a look into that?

saluti,
Stefano

2015-07-22 15:53 GMT+02:00 Stefano Bortoli <[hidden email]>:
Debugging, it seem the commitTask method of the MongoOutputCommitter is never called. Is it possible that this 'bulk' approach of mongo-hadoop 1.4 does not fit the task execution method of Flink?

any idea? thanks a lot in advance.

saluti,
Stefano

Stefano Bortoli, PhD
ENS Technical Director
_______________________________________________
OKKAMSrl www.okkam.it

Email: [hidden email]

Phone nr: <a href="tel:%2B39%200461%201823912" value="+3904611823912" target="_blank">+39 0461 1823912

Headquarters: Trento (Italy), Via Trener 8
Registered office: Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally privileged and/or confidential information. Please do not read it if you are not the intended recipient(S). Any use, distribution, reproduction or disclosure by any other person is strictly prohibited. If you have received this e-mail in error, please notify the sender and destroy the original transmission and its attachments without reading or saving it in any manner.


2015-07-22 14:26 GMT+02:00 Stefano Bortoli <[hidden email]>:
Hi,

I am trying to analyze and update a MongoDB collection with Apache Flink 0.9.0 and Mongo Hadoop 1.4.0 Hadoop 2.6.0.

The process is fairly simple, and the MongoInputFormat works smoothly, however it does not write back to the collection. The process works, because the writeAsText works as expected. I am quite puzzled because debugging I can see it writes in some temporary directory.

The mapred.output.uri seem to serve just to output a file named  _SUCCESS, and if I do not set it fails with
java.lang.IllegalArgumentException: Can not create a Path from a null string
    at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
    at org.apache.hadoop.fs.Path.<init>(Path.java:135)
    at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.open(HadoopOutputFormatBase.java:108)
    at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:186)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)

Anyone experienced something similar? any hints where to look at? Thanks a lot in advance!

saluti,
Stefano

====================================================
Configuration conf = new Configuration();
conf.set("mapred.output.dir", "/tmp/");
        conf.set(MongoUtil.MONGO_INPUT_URI_PROPERTY,
                collectionsUri);
        conf.set(MongoUtil.MONGO_OUTPUT_URI_PROPERTY,
                collectionsUri);
            
        Job job = Job.getInstance(conf);
        
        // create a MongodbInputFormat, using a Hadoop input format wrapper
        InputFormat<Object, BSONObject>  mapreduceInputFormat =  new MyMongoInputFormat<Object, BSONObject>();
        HadoopInputFormat<Object, BSONObject> hdIf = new HadoopInputFormat<Object, BSONObject>(
                mapreduceInputFormat, Object.class, BSONObject.class,
                job);
DataSet<Tuple2<Text, BSONWritable>> fin = input
                .flatMap(new myFlatMapFunction()).setParallelism(16);

        MongoConfigUtil.setOutputURI(job.getConfiguration(), collectionsUri);

        fin.output(new HadoopOutputFormat<Text, BSONWritable>(
                new MongoOutputFormat<Text, BSONWritable>(),
                job));
//        fin.writeAsText("/tmp/out", WriteMode.OVERWRITE);