Hello,
I am trying to use StreamingFileSink.forBulkFormat() for writing avro to S3.
I have used ‘CheckpointRollingPolicy’ as DefaultRolling cannot be used with bulk formats.
But when I use this I am facing 2 issues :
- ‘shouldRollOnEvent’ method is getting called on each record addition but .getsize() always gives one message size instead of current partFile size.
- Files are getting rolled out at every 1 minute even though my checkpoint is bigger (3 mins), I don’t find any way to override this 1 min default rolling.
Any suggestion would be appreciated.
Code:
val avroOcfFilesink : StreamingFileSink[GenericRecord] = StreamingFileSink.forBulkFormat(new Path(avroOutputPath),
new AvroWriterFactory[GenericRecord](new AvroBuilder[GenericRecord]() {
override def createWriter(out: OutputStream): DataFileWriter[GenericRecord] = {
val schema: Schema = new Schema.Parser().parse(faultCodeOCFRecordSchema)
val datumWriter = new ReflectDatumWriter[GenericRecord](schema)
val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
dataFileWriter.setCodec(CodecFactory.snappyCodec)
dataFileWriter.create(schema, out)
dataFileWriter
}
}))
.withBucketAssigner(new BucketAssigner[GenericRecord, String] {
override def getBucketId(in: GenericRecord, context: Context): String = {
val bucketIdPrefix = configurationParameters.getRequired("s3.bucket.id.prefix")
val currentProcessingTimeUTC = System.currentTimeMillis()
bucketIdPrefix + TimeConversion.convertTimestampToRunDate_HHMM(currentProcessingTimeUTC)
}
override def getSerializer: SimpleVersionedSerializer[String] = { SimpleVersionedStringSerializer.INSTANCE }
}).withBucketCheckInterval(120000)
.withRollingPolicy(
new CheckpointRollingPolicy[GenericRecord, String] {
override def shouldRollOnEvent(partFileState: PartFileInfo[String], element: GenericRecord): Boolean = {
log.info("###### PartFileState.getSize:"+partFileState.getSize+", Creation"+partFileState.getCreationTime+", Lastupdate:"+partFileState.getLastUpdateTime)
false
}
override def shouldRollOnProcessingTime(partFileState: PartFileInfo[String], currentTime: Long): Boolean = {
val result : Boolean = (currentTime - partFileState.getCreationTime) >= 10000
log.info(" currentTime:"+currentTime+" , partFileState.getCreationTime"+partFileState.getCreationTime+", Diff:"+(currentTime - partFileState.getCreationTime)+", result:"+result)
false
}
}
).build()
Thanks
MH