package org.apache.flink.formats.avro; import org.apache.avro.Schema; import org.apache.avro.Schema.Parser; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.avro.specific.SpecificRecordBase; import java.io.IOException; import java.io.OutputStream; import java.util.function.Function; public class CustomAvroWriters { public static AvroWriterFactory forSpecificRecord(Class type) { String schemaString = SpecificData.get().getSchema(type).toString(); AvroBuilder builder = (out) -> { return createAvroDataFileWriter(schemaString, SpecificDatumWriter::new, out); }; return new AvroWriterFactory(builder); } public static AvroWriterFactory forGenericRecord(Schema schema) { String schemaString = schema.toString(); AvroBuilder builder = (out) -> { return createAvroDataFileWriter(schemaString, GenericDatumWriter::new, out); }; return new AvroWriterFactory(builder); } public static AvroWriterFactory forGenericRecord(Schema schema, String codecName) { String schemaString = schema.toString(); AvroBuilder builder = (out) -> { return createAvroDataFileWriter(schemaString, GenericDatumWriter::new, out, codecName); }; return new AvroWriterFactory(builder); } public static AvroWriterFactory forReflectRecord(Class type) { String schemaString = ReflectData.get().getSchema(type).toString(); AvroBuilder builder = (out) -> { return createAvroDataFileWriter(schemaString, ReflectDatumWriter::new, out); }; return new AvroWriterFactory(builder); } private static DataFileWriter createAvroDataFileWriter(String schemaString, Function> datumWriterFactory, OutputStream out) throws IOException { Schema schema = (new Parser()).parse(schemaString); DatumWriter datumWriter = (DatumWriter)datumWriterFactory.apply(schema); DataFileWriter dataFileWriter = new DataFileWriter(datumWriter); dataFileWriter.create(schema, out); return dataFileWriter; } private static DataFileWriter createAvroDataFileWriter(String schemaString, Function> datumWriterFactory, OutputStream out, String codecName) throws IOException { Schema schema = (new Parser()).parse(schemaString); DatumWriter datumWriter = (DatumWriter)datumWriterFactory.apply(schema); DataFileWriter dataFileWriter = new DataFileWriter(datumWriter); dataFileWriter.setCodec(CodecFactory.fromString(codecName)); dataFileWriter.create(schema, out); return dataFileWriter; } private CustomAvroWriters() { } }