Rolling sink parquet/Avro output

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Rolling sink parquet/Avro output

bdas77
Hi There ,

Does any have Rolling sink parquet/Avro writer reference ??. I'm seeing some issue given stream is handle at rolling sink and I don't see much option override  or even open at subclass . I could resolve the same with a custom rolling sink writer, just wondering if any one has done similar or something out there , please correct me if I'm missing anything here.

~Biswajit

Thank you
~/Das
Reply | Threaded
Open this post in threaded view
|

Re: Rolling sink parquet/Avro output

elmosca
Hi Biswajit,

We use the following Writer for Parquet using Avro conversion (using Scala):

import org.apache.avro.Schema
import org.apache.flink.streaming.connectors.fs.Writer
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName

@SerialVersionUID(1L)
class MyAvroParquetWriter[T](schema: String) extends Writer[T] {

  @transient private var writer: ParquetWriter[T] = _

  override def open(fs: FileSystem, path: Path): Unit = {
    writer = AvroParquetWriter.builder[T](path)
      .withSchema(new Schema.Parser().parse(schema))
      .withCompressionCodec(CompressionCodecName.SNAPPY)
      .build()
  }

  override def write(element: T): Unit = writer.write(element)

  override def duplicate(): Writer[T] = new MyAvroParquetWriter[T](schema)

  override def close(): Unit = writer.close()

  override def getPos: Long = writer.getDataSize

  override def flush(): Long = writer.getDataSize

}

Using this library as dependency: "org.apache.parquet" % "parquet-avro" % "1.8.1". We use this writer in a rolling sink and seems fine so far.

Cheers,

Bruno
Reply | Threaded
Open this post in threaded view
|

Re: Rolling sink parquet/Avro output

elmosca
In reply to this post by bdas77
Hi Biswajit,

We use the following Writer for Parquet using Avro conversion (using Scala):



Using this library as dependency: "org.apache.parquet" % "parquet-avro" %
"1.8.1". We use this writer in a rolling sink and seems fine so far.

Cheers,

Bruno



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Rolling-sink-parquet-Avro-output-tp11123p11127.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Rolling sink parquet/Avro output

elmosca
Sorry, something went wrong with the code for the Writer. Here it is again:

import org.apache.avro.Schema
import org.apache.flink.streaming.connectors.fs.Writer
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName

@SerialVersionUID(1L)
class MyAvroParquetWriter[T](schema: String) extends Writer[T] {

  @transient private var writer: ParquetWriter[T] = _

  override def open(fs: FileSystem, path: Path): Unit = {
    writer = AvroParquetWriter.builder[T](path)
      .withSchema(new Schema.Parser().parse(schema))
      .withCompressionCodec(CompressionCodecName.SNAPPY)
      .build()
  }

  override def write(element: T): Unit = writer.write(element)

  override def duplicate(): Writer[T] = new MyAvroParquetWriter[T](schema)

  override def close(): Unit = writer.close()

  override def getPos: Long = writer.getDataSize

  override def flush(): Long = writer.getDataSize

}

Using this library as dependency: "org.apache.parquet" % "parquet-avro" % "1.8.1". We use this writer in a rolling sink and seems fine so far.

Cheers,

Bruno

On Wed, 18 Jan 2017 at 09:09 elmosca <[hidden email]> wrote:
Hi Biswajit,

We use the following Writer for Parquet using Avro conversion (using Scala):



Using this library as dependency: "org.apache.parquet" % "parquet-avro" %
"1.8.1". We use this writer in a rolling sink and seems fine so far.

Cheers,

Bruno



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Rolling-sink-parquet-Avro-output-tp11123p11127.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Rolling sink parquet/Avro output

bdas77
Thank for the mail Bruno !!

On Wed, Jan 18, 2017 at 1:10 AM, Bruno Aranda <[hidden email]> wrote:
Sorry, something went wrong with the code for the Writer. Here it is again:

import org.apache.avro.Schema
import org.apache.flink.streaming.connectors.fs.Writer
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName

@SerialVersionUID(1L)
class MyAvroParquetWriter[T](schema: String) extends Writer[T] {

  @transient private var writer: ParquetWriter[T] = _

  override def open(fs: FileSystem, path: Path): Unit = {
    writer = AvroParquetWriter.builder[T](path)
      .withSchema(new Schema.Parser().parse(schema))
      .withCompressionCodec(CompressionCodecName.SNAPPY)
      .build()
  }

  override def write(element: T): Unit = writer.write(element)

  override def duplicate(): Writer[T] = new MyAvroParquetWriter[T](schema)

  override def close(): Unit = writer.close()

  override def getPos: Long = writer.getDataSize

  override def flush(): Long = writer.getDataSize

}

Using this library as dependency: "org.apache.parquet" % "parquet-avro" % "1.8.1". We use this writer in a rolling sink and seems fine so far.

Cheers,

Bruno

On Wed, 18 Jan 2017 at 09:09 elmosca <[hidden email]> wrote:
Hi Biswajit,

We use the following Writer for Parquet using Avro conversion (using Scala):



Using this library as dependency: "org.apache.parquet" % "parquet-avro" %
"1.8.1". We use this writer in a rolling sink and seems fine so far.

Cheers,

Bruno



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Rolling-sink-parquet-Avro-output-tp11123p11127.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Thank you
~/Das