package org.apache.flink.formats.avro; import java.io.IOException; import java.io.OutputStream; import java.util.function.Function; import org.apache.avro.Schema; import org.apache.avro.Schema.Parser; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.avro.specific.SpecificRecordBase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.CompressionOutputStream; public class CustomAvroWriters { public static AvroWriterFactory forSpecificRecord(Class type) { String schemaString = SpecificData.get().getSchema(type).toString(); AvroBuilder builder = (out) -> { return createAvroDataFileWriter(schemaString, SpecificDatumWriter::new, out); }; return new AvroWriterFactory(builder); } public static AvroWriterFactory forGenericRecord(Schema schema) { String schemaString = schema.toString(); AvroBuilder builder = (out) -> { return createAvroDataFileWriter(schemaString, GenericDatumWriter::new, out); }; return new AvroWriterFactory(builder); } public static AvroWriterFactory forGenericRecord(Schema schema, String codecName) { String schemaString = schema.toString(); AvroBuilder builder = (out) -> { return createAvroDataFileWriter(schemaString, GenericDatumWriter::new, out, codecName); }; return new AvroWriterFactory(builder); } public static AvroWriterFactory forReflectRecord(Class type) { String schemaString = ReflectData.get().getSchema(type).toString(); AvroBuilder builder = (out) -> { return createAvroDataFileWriter(schemaString, ReflectDatumWriter::new, out); }; return new AvroWriterFactory(builder); } private static DataFileWriter createAvroDataFileWriter(String schemaString, Function> datumWriterFactory, OutputStream out) throws IOException { Schema schema = (new Parser()).parse(schemaString); DatumWriter datumWriter = (DatumWriter)datumWriterFactory.apply(schema); DataFileWriter dataFileWriter = new DataFileWriter(datumWriter); dataFileWriter.create(schema, out); return dataFileWriter; } private static DataFileWriter createAvroDataFileWriter(String schemaString, Function> datumWriterFactory, OutputStream out, String codecName) throws IOException { Schema schema = (new Parser()).parse(schemaString); DatumWriter datumWriter = (DatumWriter)datumWriterFactory.apply(schema); DataFileWriter dataFileWriter = new DataFileWriter(datumWriter); CompressionCodec codec = getCompressionCodec(codecName); CompressionOutputStream cos = codec.createOutputStream(out); dataFileWriter.create(schema, cos); return dataFileWriter; } private CustomAvroWriters() { } private static CompressionCodec getCompressionCodec(String codecName) { CompressionCodecFactory codecFactory = new CompressionCodecFactory(new Configuration()); CompressionCodec codec = codecFactory.getCodecByName(codecName); if (codec == null) { try { codec = (CompressionCodec) Class.forName(codecName).newInstance(); }catch(Exception ex) { throw new RuntimeException("Codec " + codecName + " not found.",ex); } } return codec; } }