StreamingFileSink.forBulkFormat() with CheckpointRollingPolicy issues

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

StreamingFileSink.forBulkFormat() with CheckpointRollingPolicy issues

mahendra.hegde

Hello,

 

I am trying to use StreamingFileSink.forBulkFormat() for writing avro to S3.

I have used ‘CheckpointRollingPolicy’ as DefaultRolling cannot be used with bulk formats.

 

But when I use this I am facing 2 issues :

  1. ‘shouldRollOnEvent’ method is getting called on each record addition but .getsize() always gives one message size instead of current partFile size.
  2. Files are getting rolled out at every 1 minute even though my checkpoint is bigger (3 mins), I don’t find any way to override this 1 min default rolling.

 

Any suggestion would be appreciated.

 

 

Code:

 

val avroOcfFilesink : StreamingFileSink[GenericRecord] =  StreamingFileSink.forBulkFormat(new Path(avroOutputPath),

      new AvroWriterFactory[GenericRecord](new AvroBuilder[GenericRecord]() {

        override def createWriter(out: OutputStream): DataFileWriter[GenericRecord] = {

          val schema: Schema = new Schema.Parser().parse(faultCodeOCFRecordSchema)

          val datumWriter = new ReflectDatumWriter[GenericRecord](schema)

          val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)

          dataFileWriter.setCodec(CodecFactory.snappyCodec)

          dataFileWriter.create(schema, out)

          dataFileWriter

        }

      }))

      .withBucketAssigner(new BucketAssigner[GenericRecord, String] {

        override def getBucketId(in: GenericRecord, context: Context): String = {

          val bucketIdPrefix = configurationParameters.getRequired("s3.bucket.id.prefix")

          val currentProcessingTimeUTC = System.currentTimeMillis()

            bucketIdPrefix + TimeConversion.convertTimestampToRunDate_HHMM(currentProcessingTimeUTC)

        

        }

        override def getSerializer: SimpleVersionedSerializer[String] = { SimpleVersionedStringSerializer.INSTANCE }

      }).withBucketCheckInterval(120000)

     .withRollingPolicy(

       new CheckpointRollingPolicy[GenericRecord, String] {

        override def shouldRollOnEvent(partFileState: PartFileInfo[String], element: GenericRecord): Boolean = {

          log.info("###### PartFileState.getSize:"+partFileState.getSize+", Creation"+partFileState.getCreationTime+",  Lastupdate:"+partFileState.getLastUpdateTime)

          false

        }

        override def shouldRollOnProcessingTime(partFileState: PartFileInfo[String], currentTime: Long): Boolean = {

          val result : Boolean =  (currentTime - partFileState.getCreationTime) >= 10000

          log.info(" currentTime:"+currentTime+" , partFileState.getCreationTime"+partFileState.getCreationTime+", Diff:"+(currentTime - partFileState.getCreationTime)+", result:"+result)

          false

        }

      }

    ).build()

 

 

Thanks

MH