Is copying flink-hadoop-compatibility jar to FLINK_HOME/lib the only way to make it work?

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Is copying flink-hadoop-compatibility jar to FLINK_HOME/lib the only way to make it work?

morven huang

Hi,

 

I’m using Flink 1.5.6 and Hadoop 2.7.1.

 

My requirement is to read hdfs sequence file (SequenceFileInputFormat), then write it back to hdfs (SequenceFileAsBinaryOutputFormat with compression).

 

Below code won’t work until I copy the flink-hadoop-compatibility jar to FLINK_HOME/lib. I find a similar discussion http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/hadoopcompatibility-not-in-dist-td12552.html, it seems this is the only way to get the hadoop compatibility work?

 

If this is the case, do I need to copy that jar to every node of the cluster?

 

Or, for my super simple requirement above, is there any other way to go? For example, without using  flink-hadoop-compatibility?

 

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;

import org.apache.flink.api.java.operators.DataSource;

import org.apache.flink.api.java.operators.FlatMapOperator;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.java.typeutils.TupleTypeInfo;

import org.apache.flink.hadoopcompatibility.HadoopInputs;

import org.apache.flink.util.Collector;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.SequenceFile.CompressionType;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

 

import com.twitter.chill.protobuf.ProtobufSerializer;

 

public class Foobar {

 

        @SuppressWarnings("serial")

        public static void main(String[] args) throws Exception {

                 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

                env.getConfig().registerTypeWithKryoSerializer(ProtobufObject.class, ProtobufSerializer.class);

 

                 String path = "hdfs://...";

                 DataSource<Tuple2<NullWritable, BytesWritable>> input = env.createInput(HadoopInputs.readHadoopFile(

                                  new org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat<NullWritable, BytesWritable>(),

                                  NullWritable.class, BytesWritable.class, path),

                                  new TupleTypeInfo<>(TypeInformation.of(NullWritable.class), TypeInformation.of(BytesWritable.class)));

 

                 FlatMapOperator<Tuple2<NullWritable, BytesWritable>, Tuple2<BytesWritable, BytesWritable>> x = input.flatMap(

                                  new FlatMapFunction<Tuple2<NullWritable, BytesWritable>, Tuple2<BytesWritable, BytesWritable>>() {

 

                                          @Override

                                          public void flatMap(Tuple2<NullWritable, BytesWritable> value,

                                                           Collector<Tuple2<BytesWritable, BytesWritable>> out) throws Exception {

                                                   ProtobufObject info = ProtobufObject.parseFrom(value.f1.copyBytes());

                                                   String key = info.getKey();

                                                   out.collect(new Tuple2<BytesWritable, BytesWritable>(new BytesWritable(key.getBytes()),

                                                                   new BytesWritable(info.toByteArray())));

                                          }

                                  });

 

                 Job job = Job.getInstance();

                 HadoopOutputFormat<BytesWritable, BytesWritable> hadoopOF = new HadoopOutputFormat<BytesWritable, BytesWritable>(

                                  new SequenceFileAsBinaryOutputFormat(), job);

 

                hadoopOF.getConfiguration().set("mapreduce.output.fileoutputformat.compress", "true");

                hadoopOF.getConfiguration().set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString());

                 TextOutputFormat.setOutputPath(job, new Path("hdfs://..."));

 

                 x.output(hadoopOF);

                 env.execute("foo");

        }

}

 

Sent from Mail for Windows 10