package org.apache.flink.formats.avro;

import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecordBase;

import java.io.IOException;
import java.io.OutputStream;
import java.util.function.Function;

public class CustomAvroWriters {
    public static <T extends SpecificRecordBase> AvroWriterFactory<T> forSpecificRecord(Class<T> type) {
        String schemaString = SpecificData.get().getSchema(type).toString();
        AvroBuilder<T> builder = (out) -> {
            return createAvroDataFileWriter(schemaString, SpecificDatumWriter::new, out);
        };
        return new AvroWriterFactory(builder);
    }

    public static AvroWriterFactory<GenericRecord> forGenericRecord(Schema schema) {
        String schemaString = schema.toString();
        AvroBuilder<GenericRecord> builder = (out) -> {
            return createAvroDataFileWriter(schemaString, GenericDatumWriter::new, out);
        };
        return new AvroWriterFactory(builder);
    }

    public static AvroWriterFactory<GenericRecord> forGenericRecord(Schema schema, String codecName) {
        String schemaString = schema.toString();
        AvroBuilder<GenericRecord> builder = (out) -> {
            return createAvroDataFileWriter(schemaString, GenericDatumWriter::new, out, codecName);
        };
        return new AvroWriterFactory(builder);
    }

    public static <T> AvroWriterFactory<T> forReflectRecord(Class<T> type) {
        String schemaString = ReflectData.get().getSchema(type).toString();
        AvroBuilder<T> builder = (out) -> {
            return createAvroDataFileWriter(schemaString, ReflectDatumWriter::new, out);
        };
        return new AvroWriterFactory(builder);
    }

    private static <T> DataFileWriter<T> createAvroDataFileWriter(String schemaString, Function<Schema, DatumWriter<T>> datumWriterFactory, OutputStream out) throws IOException {
        Schema schema = (new Parser()).parse(schemaString);
        DatumWriter<T> datumWriter = (DatumWriter)datumWriterFactory.apply(schema);
        DataFileWriter<T> dataFileWriter = new DataFileWriter(datumWriter);
        dataFileWriter.create(schema, out);
        return dataFileWriter;
    }

    private static <T> DataFileWriter<T> createAvroDataFileWriter(String schemaString, Function<Schema, DatumWriter<T>> datumWriterFactory, OutputStream out, String codecName) throws IOException {
        Schema schema = (new Parser()).parse(schemaString);
        DatumWriter<T> datumWriter = (DatumWriter)datumWriterFactory.apply(schema);
        DataFileWriter<T> dataFileWriter = new DataFileWriter(datumWriter);
        dataFileWriter.setCodec(CodecFactory.fromString(codecName));
        dataFileWriter.create(schema, out);
        return dataFileWriter;
    }

    private CustomAvroWriters() {
    }
}