package org.apache.flink.formats.avro; import org.apache.avro.Schema; import org.apache.avro.Schema.Parser; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.avro.specific.SpecificRecordBase; import java.io.IOException; import java.io.OutputStream; import java.util.function.Function; public class CustomAvroWriters { public static <T extends SpecificRecordBase> AvroWriterFactory<T> forSpecificRecord(Class<T> type) { String schemaString = SpecificData.get().getSchema(type).toString(); AvroBuilder<T> builder = (out) -> { return createAvroDataFileWriter(schemaString, SpecificDatumWriter::new, out); }; return new AvroWriterFactory(builder); } public static AvroWriterFactory<GenericRecord> forGenericRecord(Schema schema) { String schemaString = schema.toString(); AvroBuilder<GenericRecord> builder = (out) -> { return createAvroDataFileWriter(schemaString, GenericDatumWriter::new, out); }; return new AvroWriterFactory(builder); } public static AvroWriterFactory<GenericRecord> forGenericRecord(Schema schema, String codecName) { String schemaString = schema.toString(); AvroBuilder<GenericRecord> builder = (out) -> { return createAvroDataFileWriter(schemaString, GenericDatumWriter::new, out, codecName); }; return new AvroWriterFactory(builder); } public static <T> AvroWriterFactory<T> forReflectRecord(Class<T> type) { String schemaString = ReflectData.get().getSchema(type).toString(); AvroBuilder<T> builder = (out) -> { return createAvroDataFileWriter(schemaString, ReflectDatumWriter::new, out); }; return new AvroWriterFactory(builder); } private static <T> DataFileWriter<T> createAvroDataFileWriter(String schemaString, Function<Schema, DatumWriter<T>> datumWriterFactory, OutputStream out) throws IOException { Schema schema = (new Parser()).parse(schemaString); DatumWriter<T> datumWriter = (DatumWriter)datumWriterFactory.apply(schema); DataFileWriter<T> dataFileWriter = new DataFileWriter(datumWriter); dataFileWriter.create(schema, out); return dataFileWriter; } private static <T> DataFileWriter<T> createAvroDataFileWriter(String schemaString, Function<Schema, DatumWriter<T>> datumWriterFactory, OutputStream out, String codecName) throws IOException { Schema schema = (new Parser()).parse(schemaString); DatumWriter<T> datumWriter = (DatumWriter)datumWriterFactory.apply(schema); DataFileWriter<T> dataFileWriter = new DataFileWriter(datumWriter); dataFileWriter.setCodec(CodecFactory.fromString(codecName)); dataFileWriter.create(schema, out); return dataFileWriter; } private CustomAvroWriters() { } }