Re: Rolling sink parquet/Avro output
Posted by
elmosca on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Rolling-sink-parquet-Avro-output-tp11123p11126.html
Hi Biswajit,
We use the following Writer for Parquet using Avro conversion (using Scala):
import org.apache.avro.Schema
import org.apache.flink.streaming.connectors.fs.Writer
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
@SerialVersionUID(1L)
class MyAvroParquetWriter[T](schema: String) extends Writer[T] {
@transient private var writer: ParquetWriter[T] = _
override def open(fs: FileSystem, path: Path): Unit = {
writer = AvroParquetWriter.builder[T](path)
.withSchema(new Schema.Parser().parse(schema))
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build()
}
override def write(element: T): Unit = writer.write(element)
override def duplicate(): Writer[T] = new MyAvroParquetWriter[T](schema)
override def close(): Unit = writer.close()
override def getPos: Long = writer.getDataSize
override def flush(): Long = writer.getDataSize
}
Using this library as dependency: "org.apache.parquet" % "parquet-avro" % "1.8.1". We use this writer in a rolling sink and seems fine so far.
Cheers,
Bruno