Hi There , Does any have Rolling sink parquet/Avro writer reference ??. I'm seeing some issue given stream is handle at rolling sink and I don't see much option override or even open at subclass . I could resolve the same with a custom rolling sink writer, just wondering if any one has done similar or something out there , please correct me if I'm missing anything here.Thank you ~/Das |
Hi Biswajit,
We use the following Writer for Parquet using Avro conversion (using Scala): import org.apache.avro.Schema import org.apache.flink.streaming.connectors.fs.Writer import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName @SerialVersionUID(1L) class MyAvroParquetWriter[T](schema: String) extends Writer[T] { @transient private var writer: ParquetWriter[T] = _ override def open(fs: FileSystem, path: Path): Unit = { writer = AvroParquetWriter.builder[T](path) .withSchema(new Schema.Parser().parse(schema)) .withCompressionCodec(CompressionCodecName.SNAPPY) .build() } override def write(element: T): Unit = writer.write(element) override def duplicate(): Writer[T] = new MyAvroParquetWriter[T](schema) override def close(): Unit = writer.close() override def getPos: Long = writer.getDataSize override def flush(): Long = writer.getDataSize } Using this library as dependency: "org.apache.parquet" % "parquet-avro" % "1.8.1". We use this writer in a rolling sink and seems fine so far. Cheers, Bruno |
In reply to this post by bdas77
Hi Biswajit,
We use the following Writer for Parquet using Avro conversion (using Scala): Using this library as dependency: "org.apache.parquet" % "parquet-avro" % "1.8.1". We use this writer in a rolling sink and seems fine so far. Cheers, Bruno -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Rolling-sink-parquet-Avro-output-tp11123p11127.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Sorry, something went wrong with the code for the Writer. Here it is again: import org.apache.avro.Schema import org.apache.flink.streaming.connectors.fs.Writer import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName @SerialVersionUID(1L) class MyAvroParquetWriter[T](schema: String) extends Writer[T] { @transient private var writer: ParquetWriter[T] = _ override def open(fs: FileSystem, path: Path): Unit = { writer = AvroParquetWriter.builder[T](path) .withSchema(new Schema.Parser().parse(schema)) .withCompressionCodec(CompressionCodecName.SNAPPY) .build() } override def write(element: T): Unit = writer.write(element) override def duplicate(): Writer[T] = new MyAvroParquetWriter[T](schema) override def close(): Unit = writer.close() override def getPos: Long = writer.getDataSize override def flush(): Long = writer.getDataSize } Using this library as dependency: "org.apache.parquet" % "parquet-avro" % "1.8.1". We use this writer in a rolling sink and seems fine so far. Cheers, Bruno On Wed, 18 Jan 2017 at 09:09 elmosca <[hidden email]> wrote: Hi Biswajit, |
Thank for the mail Bruno !!
On Wed, Jan 18, 2017 at 1:10 AM, Bruno Aranda <[hidden email]> wrote:
Thank you ~/Das |
Free forum by Nabble | Edit this page |