Hi, I’m using Flink 1.5.6 and Hadoop 2.7.1. My requirement is to read hdfs sequence file (SequenceFileInputFormat), then write it back to hdfs (SequenceFileAsBinaryOutputFormat with compression). Below code won’t work until I copy the flink-hadoop-compatibility jar to FLINK_HOME/lib. I find a similar discussion http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/hadoopcompatibility-not-in-dist-td12552.html, it seems this is the only way to get the hadoop compatibility work? If this is the case, do I need to copy that jar to every node of the cluster? Or, for my super simple requirement above, is there any other way to go? For example, without using flink-hadoop-compatibility? import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.FlatMapOperator; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.hadoopcompatibility.HadoopInputs; import org.apache.flink.util.Collector; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import com.twitter.chill.protobuf.ProtobufSerializer; public class Foobar { @SuppressWarnings("serial") public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().registerTypeWithKryoSerializer(ProtobufObject.class, ProtobufSerializer.class); String path = "hdfs://..."; DataSource<Tuple2<NullWritable, BytesWritable>> input = env.createInput(HadoopInputs.readHadoopFile( new org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat<NullWritable, BytesWritable>(), NullWritable.class, BytesWritable.class, path), new TupleTypeInfo<>(TypeInformation.of(NullWritable.class), TypeInformation.of(BytesWritable.class))); FlatMapOperator<Tuple2<NullWritable, BytesWritable>, Tuple2<BytesWritable, BytesWritable>> x = input.flatMap( new FlatMapFunction<Tuple2<NullWritable, BytesWritable>, Tuple2<BytesWritable, BytesWritable>>() { @Override public void flatMap(Tuple2<NullWritable, BytesWritable> value, Collector<Tuple2<BytesWritable, BytesWritable>> out) throws Exception { ProtobufObject info = ProtobufObject.parseFrom(value.f1.copyBytes()); String key = info.getKey(); out.collect(new Tuple2<BytesWritable, BytesWritable>(new BytesWritable(key.getBytes()), new BytesWritable(info.toByteArray()))); } }); Job job = Job.getInstance(); HadoopOutputFormat<BytesWritable, BytesWritable> hadoopOF = new HadoopOutputFormat<BytesWritable, BytesWritable>( new SequenceFileAsBinaryOutputFormat(), job); hadoopOF.getConfiguration().set("mapreduce.output.fileoutputformat.compress", "true"); hadoopOF.getConfiguration().set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString()); TextOutputFormat.setOutputPath(job, new Path("hdfs://...")); x.output(hadoopOF); env.execute("foo"); } } Sent from Mail for Windows 10 |
Free forum by Nabble | Edit this page |