Sorry, something went wrong with the code for the Writer. Here it is again:import org.apache.avro.Schemaimport org.apache.flink.streaming.connectors.fs.Writer import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName @SerialVersionUID(1L)class MyAvroParquetWriter[T](schema: String) extends Writer[T] {@transient private var writer: ParquetWriter[T] = _override def open(fs: FileSystem, path: Path): Unit = {writer = AvroParquetWriter.builder[T](path) .withSchema(new Schema.Parser().parse(schema)).withCompressionCodec(CompressionCodecName.SNAPPY) .build()}override def write(element: T): Unit = writer.write(element)override def duplicate(): Writer[T] = new MyAvroParquetWriter[T](schema)override def close(): Unit = writer.close()override def getPos: Long = writer.getDataSizeoverride def flush(): Long = writer.getDataSize}Using this library as dependency: "org.apache.parquet" % "parquet-avro" % "1.8.1". We use this writer in a rolling sink and seems fine so far.Cheers,BrunoOn Wed, 18 Jan 2017 at 09:09 elmosca <[hidden email]> wrote:Hi Biswajit,
We use the following Writer for Parquet using Avro conversion (using Scala):
Using this library as dependency: "org.apache.parquet" % "parquet-avro" %
"1.8.1". We use this writer in a rolling sink and seems fine so far.
Cheers,
Bruno
--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050. n4.nabble.com/Rolling-sink- parquet-Avro-output- tp11123p11127.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Free forum by Nabble | Edit this page |