package org.apache.flink.streaming.connectors.fs.avro; /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import java.io.IOException; import java.util.Map; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileConstants; import org.apache.avro.generic.GenericData; import org.apache.avro.hadoop.file.HadoopCodecFactory; import org.apache.avro.hadoop.io.AvroDatumConverter; import org.apache.avro.hadoop.io.AvroDatumConverterFactory; import org.apache.avro.hadoop.io.AvroSerialization; import org.apache.avro.mapreduce.AvroJob; import org.apache.avro.mapreduce.AvroKeyValueRecordWriter; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.InputTypeConfigurable; import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; import org.apache.flink.streaming.connectors.fs.Writer; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.mapred.JobConf; /** * Implementation of AvroKeyValue writer that can be used in Sink. *

You'll need dependency(pay attention to classifier, it works only for hadoop2)

 {@code
 
 first thing to add avro mapred dependency
	
		org.apache.avro
		avro-mapred
		1.7.6
		hadoop2
	
}
And then:
 {@code
 RollingSink , AvroValue>> sink = new RollingSink , AvroValue>>("/tmp/path");
 sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd-HH-mm"));
 Map properties = new HashMap<>();
 Schema longSchema = Schema.create(Type.LONG);
 String keySchema = longSchema.toString();
 properties.put("avro.schema.output.key", keySchema);
 String valueSchema = longSchema.toString();
 properties.put("avro.schema.output.value", valueSchema);
 properties.put(FileOutputFormat.COMPRESS, Boolean.toString(true));
 properties.put(FileOutputFormat.COMPRESS_CODEC, SnappyCodec.class.getName());

 sink.setWriter(new AvroSinkWriter , AvroValue>(properties));
 sink.setBatchSize(1024 * 1024 * 64); // this is 64 MB
 }
 
to test with s3:
{@code
	create core-site.xml(I haven't other way to test locally)	

	
	  fs.s3.impl
	  org.apache.hadoop.fs.s3a.S3AFileSystem
	
	
	  fs.s3a.access.key
	  xxx
	
	
	
	  fs.s3a.secret.key
	  yyy
	
	
	
		
		fs.s3a.buffer.dir
		/tmp
	



and add following dependencies(not sure what is best option here):
		
			org.apache.hadoop
			hadoop-aws
			2.7.0
			provided
			
				
					guava
					com.google.guava
				
			
		

 }
 
*/ public class AvroSinkWriter implements Writer>, InputTypeConfigurable { private static final long serialVersionUID = 1L; private transient FSDataOutputStream outputStream; private transient AvroKeyValueRecordWriter writer; private Class keyClass; private Class valueClass; private final Map properties; /** * C'tor for the writer *

* You can provide different properties that will be used to configure avro key-value writer as simple properties map(see example above) * @param properties */ public AvroSinkWriter(Map properties) { this.properties = properties; } private AvroSinkWriter(Class keyClass, Class valueClass, Map properties) { this.properties = properties; this.keyClass = keyClass; this.valueClass = valueClass; } //this is almost copy-paste from AvroOutputFormatBase.getCompressionCodec(..) private CodecFactory getCompressionCodec(JobConf conf) { if (conf.getBoolean(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.COMPRESS, false)) { // Default to deflate compression. int deflateLevel = conf.getInt(org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY, CodecFactory.DEFAULT_DEFLATE_LEVEL); int xzLevel = conf.getInt(org.apache.avro.mapred.AvroOutputFormat.XZ_LEVEL_KEY, CodecFactory.DEFAULT_XZ_LEVEL); String outputCodec = conf.get(AvroJob.CONF_OUTPUT_CODEC); if (outputCodec == null) { String compressionCodec = conf.get("mapred.output.compression.codec"); String avroCodecName = HadoopCodecFactory.getAvroCodecName(compressionCodec); if (avroCodecName != null) { conf.set(AvroJob.CONF_OUTPUT_CODEC, avroCodecName); return HadoopCodecFactory.fromHadoopString(compressionCodec); } else { return CodecFactory.deflateCodec(deflateLevel); } } else if (DataFileConstants.DEFLATE_CODEC.equals(outputCodec)) { return CodecFactory.deflateCodec(deflateLevel); } else if (DataFileConstants.XZ_CODEC.equals(outputCodec)) { return CodecFactory.xzCodec(xzLevel); } else { return CodecFactory.fromString(outputCodec); } } // No compression. return CodecFactory.nullCodec(); } @Override public void open(FSDataOutputStream outStream) throws IOException { if (outputStream != null) { throw new IllegalStateException("AvroSinkWriter has already been opened."); } if (keyClass == null) { throw new IllegalStateException("Key Class has not been initialized."); } if (valueClass == null) { throw new IllegalStateException("Value Class has not been initialized."); } this.outputStream = outStream; JobConf config = new JobConf(); for (Map.Entry e : properties.entrySet()) { config.set(e.getKey(), e.getValue()); } //this code is base on AvroKeyValueOutputFormat.getRecordWriter(..) CodecFactory compressionCodec = getCompressionCodec(config); AvroDatumConverterFactory converterFactory = new AvroDatumConverterFactory(config); AvroDatumConverter keyConverter = converterFactory.create(keyClass); AvroDatumConverter valueConverter = converterFactory.create(valueClass); GenericData dataModel = AvroSerialization.createDataModel(config); writer = new AvroKeyValueRecordWriter(keyConverter, valueConverter, dataModel, compressionCodec, outputStream); } @Override public void flush() throws IOException { if (writer != null) { writer.sync(); } } @Override public void close() throws IOException { if (writer != null) { writer.close(null); } writer = null; outputStream = null; } @Override public void write(Tuple2 element) throws IOException { if (outputStream == null) { throw new IllegalStateException("SequenceFileWriter has not been opened."); } writer.write(element.f0, element.f1); } @Override public void setInputType(TypeInformation type, ExecutionConfig executionConfig) { if (!type.isTupleType()) { throw new IllegalArgumentException("Input TypeInformation is not a tuple type."); } TupleTypeInfoBase tupleType = (TupleTypeInfoBase) type; if (tupleType.getArity() != 2) { throw new IllegalArgumentException("Input TypeInformation must be a Tuple2 type."); } TypeInformation keyType = tupleType.getTypeAt(0); TypeInformation valueType = tupleType.getTypeAt(1); this.keyClass = keyType.getTypeClass(); this.valueClass = valueType.getTypeClass(); } @Override public Writer> duplicate() { return new AvroSinkWriter(keyClass, valueClass, properties); } }