I implemented an Avro to Parquet writer which previously took an Avro schema in as a string to the constructor and passed it to the AvroParquetWriter. Now I'm wondering if there is a way to get the schema from the element and pass to the AvroParquetWriter. I tried grabbing the schema from the element in the write method but it is called later than open so that doesn't seem to work. I need to do this because I'm sinking several Kafka topics in one app to s3 so different messages need different schema passed to the writer.
class ParquetSinkWriter[T <: GenericRecord]() extends Writer[T] {
@transient private var writer: ParquetWriter[T] = _
@transient private var schema: Schema = _
override def write(element: T): Unit = {
schema = element.getSchema
writer.write(element)
}
override def duplicate(): ParquetSinkWriter[T] = new ParquetSinkWriter[T]()
override def close(): Unit = writer.close()
override def getPos: Long = writer.getDataSize
override def flush(): Long = writer.getDataSize
override def open(fs: FileSystem, path: Path): Unit = {
writer = AvroParquetWriter.builder[T](path)
.withSchema(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build()
}
}