Dynamically get schema from element to pass to AvroParquetWriter

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Dynamically get schema from element to pass to AvroParquetWriter

Kyle Hamlin
I implemented an Avro to Parquet writer which previously took an Avro schema in as a string to the constructor and passed it to the AvroParquetWriter. Now I'm wondering if there is a way to get the schema from the element and pass to the AvroParquetWriter. I tried grabbing the schema from the element in the write method but it is called later than open so that doesn't seem to work. I need to do this because I'm sinking several Kafka topics in one app to s3 so different messages need different schema passed to the writer.

class ParquetSinkWriter[T <: GenericRecord]() extends Writer[T] {

@transient private var writer: ParquetWriter[T] = _
@transient private var schema: Schema = _

override def write(element: T): Unit = {
schema = element.getSchema
writer.write(element)
}

override def duplicate(): ParquetSinkWriter[T] = new ParquetSinkWriter[T]()

override def close(): Unit = writer.close()

override def getPos: Long = writer.getDataSize

override def flush(): Long = writer.getDataSize

override def open(fs: FileSystem, path: Path): Unit = {

writer = AvroParquetWriter.builder[T](path)
.withSchema(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build()
}

}
Reply | Threaded
Open this post in threaded view
|

Re: Dynamically get schema from element to pass to AvroParquetWriter

Fabian Hueske-2
Hi Kyle,

I'm not sure I understand the problem. I assume you have one sink for each Avro type (Kafka topic).
If you have multiple sinks, why is it not possible to configure each one with the correct Avro schema?

Best, Fabian

2018-01-05 22:11 GMT+01:00 Kyle Hamlin <[hidden email]>:
I implemented an Avro to Parquet writer which previously took an Avro schema in as a string to the constructor and passed it to the AvroParquetWriter. Now I'm wondering if there is a way to get the schema from the element and pass to the AvroParquetWriter. I tried grabbing the schema from the element in the write method but it is called later than open so that doesn't seem to work. I need to do this because I'm sinking several Kafka topics in one app to s3 so different messages need different schema passed to the writer.

class ParquetSinkWriter[T <: GenericRecord]() extends Writer[T] {

@transient private var writer: ParquetWriter[T] = _
@transient private var schema: Schema = _

override def write(element: T): Unit = {
schema = element.getSchema
writer.write(element)
}

override def duplicate(): ParquetSinkWriter[T] = new ParquetSinkWriter[T]()

override def close(): Unit = writer.close()

override def getPos: Long = writer.getDataSize

override def flush(): Long = writer.getDataSize

override def open(fs: FileSystem, path: Path): Unit = {

writer = AvroParquetWriter.builder[T](path)
.withSchema(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build()
}

}