Hi, I am trying to upload avro records to AWS S3 using StreamingFileSink. Avro file is getting created and uploaded with valid data but I want to add a Rolling policy which will roll the file after specific time or total part file size. With forBulkFormat() I am able to use only CheckpointRollingPolicy which is automatically rolling all part files on every check point. Though CheckpointRollingPolicy allows us to override 'shouldRollOnEvent' & 'shouldRollOnProcessingTime' it is not behaving as per overrided logic and just rolls out on each check point. -'shouldRollOnProcessingTime' method is not getting invoked for each streaming message, its - 'shouldRollOnEvent' part file size here always shows fixed single message size only not the clubbed part file size Below is the code snippet - val avroOcfFilesink : StreamingFileSink[GenericRecord] = StreamingFileSink.forBulkFormat(new Path(avroOutputPath), new AvroWriterFactory[GenericRecord](new AvroBuilder[GenericRecord]() { override def createWriter(out: OutputStream): DataFileWriter[GenericRecord] = { val schema: Schema = new Schema.Parser().parse(faultCodeOCFRecordSchema) val datumWriter = new ReflectDatumWriter[GenericRecord](schema) val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter) //dataFileWriter.setCodec(CodecFactory.snappyCodec) dataFileWriter.create(schema, out) dataFileWriter } })) .withBucketAssigner(new BucketAssigner[GenericRecord, String] { override def getBucketId(in: GenericRecord, context: Context): String = { val bucketIdPrefix = configurationParameters.getRequired("s3.bucket.id.prefix") val currentProcessingTimeUTC = System.currentTimeMillis() bucketIdPrefix + TimeConversion.convertTimestampToRunDate_HHMM(currentProcessingTimeUTC) } override def getSerializer: SimpleVersionedSerializer[String] = { SimpleVersionedStringSerializer.INSTANCE } }). withRollingPolicy( new CheckpointRollingPolicy[GenericRecord, String] { override def shouldRollOnEvent(partFileState: PartFileInfo[String], element: GenericRecord): Boolean = { println("partFileState.getSize:"+partFileState.getSize) (partFileState.getSize >= 1024*8) } override def shouldRollOnProcessingTime(partFileState: PartFileInfo[String], currentTime: Long): Boolean = { println("currentTime:"+currentTime+" , partFileState.getCreationTime"+partFileState.getCreationTime+", Diff:"+(currentTime - partFileState.getCreationTime)) (currentTime - partFileState.getCreationTime >= 600000) } } ).build() avroRecordStream.addSink(avroOcfFilesink).setParallelism(1).name("AvroToS3Bucket") |
Free forum by Nabble | Edit this page |