Does HadoopOutputFormat create MapReduce job internally?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Does HadoopOutputFormat create MapReduce job internally?

morven huang

Hi,

 

Id like to sink my data into hdfs using SequenceFileAsBinaryOutputFormat with compression, and I find a way from the link https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/hadoop_compatibility.html, the code works, but Im curious to know, since it creates a mapreduce Job instance here, would this Flink application creates and run a mapreduce underneath? If so, will it kill performance?

 

I tried to figure out by looking into log, but couldnt get a clue, hope people could shed some light here. Thank you.

 

Job job = Job.getInstance();

HadoopOutputFormat<BytesWritable, BytesWritable> hadoopOF = new HadoopOutputFormat<BytesWritable, BytesWritable>(

                    new SequenceFileAsBinaryOutputFormat(), job);

 

hadoopOF.getConfiguration().set("mapreduce.output.fileoutputformat.compress", "true");

hadoopOF.getConfiguration().set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString());

TextOutputFormat.setOutputPath(job, new Path("hdfs://..."));

dataset.output(hadoopOF);

Reply | Threaded
Open this post in threaded view
|

Re: Does HadoopOutputFormat create MapReduce job internally?

Fabian Hueske-2
Hi,

Flink's Hadoop compatibility functions just wrap functions that were implemented against Hadoop's interfaces in wrapper functions that are implemented against Flink's interfaces.
There is no Hadoop cluster started or MapReduce job being executed.

Job is just a class of the Hadoop API. It does not imply that a Hadoop job is executed.

Best,
Fabian



Am Mi., 10. Apr. 2019 um 15:12 Uhr schrieb Morven Huang <[hidden email]>:

Hi,

 

Id like to sink my data into hdfs using SequenceFileAsBinaryOutputFormat with compression, and I find a way from the link https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/hadoop_compatibility.html, the code works, but Im curious to know, since it creates a mapreduce Job instance here, would this Flink application creates and run a mapreduce underneath? If so, will it kill performance?

 

I tried to figure out by looking into log, but couldnt get a clue, hope people could shed some light here. Thank you.

 

Job job = Job.getInstance();

HadoopOutputFormat<BytesWritable, BytesWritable> hadoopOF = new HadoopOutputFormat<BytesWritable, BytesWritable>(

                    new SequenceFileAsBinaryOutputFormat(), job);

 

hadoopOF.getConfiguration().set("mapreduce.output.fileoutputformat.compress", "true");

hadoopOF.getConfiguration().set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString());

TextOutputFormat.setOutputPath(job, new Path("hdfs://..."));

dataset.output(hadoopOF);

Reply | Threaded
Open this post in threaded view
|

Re: Does HadoopOutputFormat create MapReduce job internally?

morven huang
Hi Fabian,

Thank you for the clarification.

Best,
Morven Huang

On Wed, Apr 10, 2019 at 9:57 PM Fabian Hueske <[hidden email]> wrote:
Hi,

Flink's Hadoop compatibility functions just wrap functions that were implemented against Hadoop's interfaces in wrapper functions that are implemented against Flink's interfaces.
There is no Hadoop cluster started or MapReduce job being executed.

Job is just a class of the Hadoop API. It does not imply that a Hadoop job is executed.

Best,
Fabian



Am Mi., 10. Apr. 2019 um 15:12 Uhr schrieb Morven Huang <[hidden email]>:

Hi,

 

Id like to sink my data into hdfs using SequenceFileAsBinaryOutputFormat with compression, and I find a way from the link https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/hadoop_compatibility.html, the code works, but Im curious to know, since it creates a mapreduce Job instance here, would this Flink application creates and run a mapreduce underneath? If so, will it kill performance?

 

I tried to figure out by looking into log, but couldnt get a clue, hope people could shed some light here. Thank you.

 

Job job = Job.getInstance();

HadoopOutputFormat<BytesWritable, BytesWritable> hadoopOF = new HadoopOutputFormat<BytesWritable, BytesWritable>(

                    new SequenceFileAsBinaryOutputFormat(), job);

 

hadoopOF.getConfiguration().set("mapreduce.output.fileoutputformat.compress", "true");

hadoopOF.getConfiguration().set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString());

TextOutputFormat.setOutputPath(job, new Path("hdfs://..."));

dataset.output(hadoopOF);