package org.apache.flink.streaming.connectors.fs.avro;
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.Map;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.generic.GenericData;
import org.apache.avro.hadoop.file.HadoopCodecFactory;
import org.apache.avro.hadoop.io.AvroDatumConverter;
import org.apache.avro.hadoop.io.AvroDatumConverterFactory;
import org.apache.avro.hadoop.io.AvroSerialization;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyValueRecordWriter;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapred.JobConf;
/**
* Implementation of AvroKeyValue writer that can be used in Sink.
*
You'll need dependency(pay attention to classifier, it works only for hadoop2)
{@code
first thing to add avro mapred dependency
org.apache.avro
avro-mapred
1.7.6
hadoop2
}
And then:
{@code
RollingSink , AvroValue>> sink = new RollingSink , AvroValue>>("/tmp/path");
sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd-HH-mm"));
Map properties = new HashMap<>();
Schema longSchema = Schema.create(Type.LONG);
String keySchema = longSchema.toString();
properties.put("avro.schema.output.key", keySchema);
String valueSchema = longSchema.toString();
properties.put("avro.schema.output.value", valueSchema);
properties.put(FileOutputFormat.COMPRESS, Boolean.toString(true));
properties.put(FileOutputFormat.COMPRESS_CODEC, SnappyCodec.class.getName());
sink.setWriter(new AvroSinkWriter , AvroValue>(properties));
sink.setBatchSize(1024 * 1024 * 64); // this is 64 MB
}
to test with s3:
{@code
create core-site.xml(I haven't other way to test locally)
fs.s3.impl
org.apache.hadoop.fs.s3a.S3AFileSystem
fs.s3a.access.key
xxx
fs.s3a.secret.key
yyy
fs.s3a.buffer.dir
/tmp
and add following dependencies(not sure what is best option here):
org.apache.hadoop
hadoop-aws
2.7.0
provided
guava
com.google.guava
}
*/
public class AvroSinkWriter implements Writer>, InputTypeConfigurable {
private static final long serialVersionUID = 1L;
private transient FSDataOutputStream outputStream;
private transient AvroKeyValueRecordWriter writer;
private Class keyClass;
private Class valueClass;
private final Map properties;
/**
* C'tor for the writer
*
* You can provide different properties that will be used to configure avro key-value writer as simple properties map(see example above)
* @param properties
*/
public AvroSinkWriter(Map properties) {
this.properties = properties;
}
private AvroSinkWriter(Class keyClass, Class valueClass, Map properties) {
this.properties = properties;
this.keyClass = keyClass;
this.valueClass = valueClass;
}
//this is almost copy-paste from AvroOutputFormatBase.getCompressionCodec(..)
private CodecFactory getCompressionCodec(JobConf conf) {
if (conf.getBoolean(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.COMPRESS, false)) {
// Default to deflate compression.
int deflateLevel = conf.getInt(org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY,
CodecFactory.DEFAULT_DEFLATE_LEVEL);
int xzLevel = conf.getInt(org.apache.avro.mapred.AvroOutputFormat.XZ_LEVEL_KEY,
CodecFactory.DEFAULT_XZ_LEVEL);
String outputCodec = conf.get(AvroJob.CONF_OUTPUT_CODEC);
if (outputCodec == null) {
String compressionCodec = conf.get("mapred.output.compression.codec");
String avroCodecName = HadoopCodecFactory.getAvroCodecName(compressionCodec);
if (avroCodecName != null) {
conf.set(AvroJob.CONF_OUTPUT_CODEC, avroCodecName);
return HadoopCodecFactory.fromHadoopString(compressionCodec);
} else {
return CodecFactory.deflateCodec(deflateLevel);
}
} else if (DataFileConstants.DEFLATE_CODEC.equals(outputCodec)) {
return CodecFactory.deflateCodec(deflateLevel);
} else if (DataFileConstants.XZ_CODEC.equals(outputCodec)) {
return CodecFactory.xzCodec(xzLevel);
} else {
return CodecFactory.fromString(outputCodec);
}
}
// No compression.
return CodecFactory.nullCodec();
}
@Override
public void open(FSDataOutputStream outStream) throws IOException {
if (outputStream != null) {
throw new IllegalStateException("AvroSinkWriter has already been opened.");
}
if (keyClass == null) {
throw new IllegalStateException("Key Class has not been initialized.");
}
if (valueClass == null) {
throw new IllegalStateException("Value Class has not been initialized.");
}
this.outputStream = outStream;
JobConf config = new JobConf();
for (Map.Entry e : properties.entrySet()) {
config.set(e.getKey(), e.getValue());
}
//this code is base on AvroKeyValueOutputFormat.getRecordWriter(..)
CodecFactory compressionCodec = getCompressionCodec(config);
AvroDatumConverterFactory converterFactory = new AvroDatumConverterFactory(config);
AvroDatumConverter keyConverter = converterFactory.create(keyClass);
AvroDatumConverter valueConverter = converterFactory.create(valueClass);
GenericData dataModel = AvroSerialization.createDataModel(config);
writer = new AvroKeyValueRecordWriter(keyConverter, valueConverter, dataModel, compressionCodec, outputStream);
}
@Override
public void flush() throws IOException {
if (writer != null) {
writer.sync();
}
}
@Override
public void close() throws IOException {
if (writer != null) {
writer.close(null);
}
writer = null;
outputStream = null;
}
@Override
public void write(Tuple2 element) throws IOException {
if (outputStream == null) {
throw new IllegalStateException("SequenceFileWriter has not been opened.");
}
writer.write(element.f0, element.f1);
}
@Override
public void setInputType(TypeInformation> type, ExecutionConfig executionConfig) {
if (!type.isTupleType()) {
throw new IllegalArgumentException("Input TypeInformation is not a tuple type.");
}
TupleTypeInfoBase> tupleType = (TupleTypeInfoBase>) type;
if (tupleType.getArity() != 2) {
throw new IllegalArgumentException("Input TypeInformation must be a Tuple2 type.");
}
TypeInformation keyType = tupleType.getTypeAt(0);
TypeInformation valueType = tupleType.getTypeAt(1);
this.keyClass = keyType.getTypeClass();
this.valueClass = valueType.getTypeClass();
}
@Override
public Writer> duplicate() {
return new AvroSinkWriter(keyClass, valueClass, properties);
}
}