Hello, I am trying to use StreamingFileSink.forBulkFormat() for writing avro to S3. I have used ‘CheckpointRollingPolicy’ as DefaultRolling cannot be used with bulk formats. But when I use this I am facing 2 issues :
Any suggestion would be appreciated. Code: val avroOcfFilesink : StreamingFileSink[GenericRecord] = StreamingFileSink.forBulkFormat(new Path(avroOutputPath), new AvroWriterFactory[GenericRecord](new AvroBuilder[GenericRecord]() { override def createWriter(out: OutputStream): DataFileWriter[GenericRecord] = { val schema: Schema = new Schema.Parser().parse(faultCodeOCFRecordSchema) val datumWriter = new ReflectDatumWriter[GenericRecord](schema) val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter) dataFileWriter.setCodec(CodecFactory.snappyCodec) dataFileWriter.create(schema, out) dataFileWriter } })) .withBucketAssigner(new BucketAssigner[GenericRecord, String] { override def getBucketId(in: GenericRecord, context: Context): String = { val bucketIdPrefix = configurationParameters.getRequired("s3.bucket.id.prefix") val currentProcessingTimeUTC = System.currentTimeMillis() bucketIdPrefix + TimeConversion.convertTimestampToRunDate_HHMM(currentProcessingTimeUTC) } override def getSerializer: SimpleVersionedSerializer[String] = { SimpleVersionedStringSerializer.INSTANCE } }).withBucketCheckInterval(120000) .withRollingPolicy( new CheckpointRollingPolicy[GenericRecord, String] { override def shouldRollOnEvent(partFileState: PartFileInfo[String], element: GenericRecord): Boolean = { log.info("###### PartFileState.getSize:"+partFileState.getSize+", Creation"+partFileState.getCreationTime+", Lastupdate:"+partFileState.getLastUpdateTime) false } override def shouldRollOnProcessingTime(partFileState: PartFileInfo[String], currentTime: Long): Boolean = { val result : Boolean = (currentTime - partFileState.getCreationTime) >= 10000 log.info(" currentTime:"+currentTime+" , partFileState.getCreationTime"+partFileState.getCreationTime+", Diff:"+(currentTime - partFileState.getCreationTime)+", result:"+result) false } } ).build() Thanks MH |
Free forum by Nabble | Edit this page |