StreamingFileSink.forBulkFormat() for Avro is not working with withRollingPolicyllingfile

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

StreamingFileSink.forBulkFormat() for Avro is not working with withRollingPolicyllingfile

mahendra.hegde

Hi,

 

I am trying to upload avro records to AWS S3 using StreamingFileSink.

Avro file is getting created and uploaded with valid data but I want to add a Rolling policy which will roll the file after specific time or total part file size.

 

With forBulkFormat() I am able to use only CheckpointRollingPolicy which is automatically rolling all part files on every check point.

Though CheckpointRollingPolicy allows us to override 'shouldRollOnEvent' & 'shouldRollOnProcessingTime'  it is not behaving as per overrided logic and just rolls out on each check point.

-'shouldRollOnProcessingTime' method is not getting invoked for each streaming message, its

- 'shouldRollOnEvent' part file size here always shows fixed single message size only not the clubbed part file size

 

 

Below is the code snippet -

 

    val avroOcfFilesink : StreamingFileSink[GenericRecord] =  StreamingFileSink.forBulkFormat(new Path(avroOutputPath),

      new AvroWriterFactory[GenericRecord](new AvroBuilder[GenericRecord]() {

        override def createWriter(out: OutputStream): DataFileWriter[GenericRecord] = {

          val schema: Schema = new Schema.Parser().parse(faultCodeOCFRecordSchema)

          val datumWriter = new ReflectDatumWriter[GenericRecord](schema)

          val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)

          //dataFileWriter.setCodec(CodecFactory.snappyCodec)

          dataFileWriter.create(schema, out)

          dataFileWriter

        }

      }))

      .withBucketAssigner(new BucketAssigner[GenericRecord, String] {

        override def getBucketId(in: GenericRecord, context: Context): String = {

          val bucketIdPrefix = configurationParameters.getRequired("s3.bucket.id.prefix")

          val currentProcessingTimeUTC = System.currentTimeMillis()

            bucketIdPrefix + TimeConversion.convertTimestampToRunDate_HHMM(currentProcessingTimeUTC)

        }

        override def getSerializer: SimpleVersionedSerializer[String] = { SimpleVersionedStringSerializer.INSTANCE }

      }).

      withRollingPolicy(

      new CheckpointRollingPolicy[GenericRecord, String] {

        override def shouldRollOnEvent(partFileState: PartFileInfo[String], element: GenericRecord): Boolean = {

          println("partFileState.getSize:"+partFileState.getSize)

          (partFileState.getSize >= 1024*8)

        }

        override def shouldRollOnProcessingTime(partFileState: PartFileInfo[String], currentTime: Long): Boolean = {

          println("currentTime:"+currentTime+" , partFileState.getCreationTime"+partFileState.getCreationTime+", Diff:"+(currentTime - partFileState.getCreationTime))

          (currentTime - partFileState.getCreationTime >= 600000)

        }

      }

    ).build()

 

    avroRecordStream.addSink(avroOcfFilesink).setParallelism(1).name("AvroToS3Bucket")