/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.fs;
import in.common.utils.FlinkUtil;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.fs.bucketing.BasePathBucketer;
import org.apache.flink.streaming.connectors.fs.bucketing.Bucketer;
import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Sink that emits its input elements to {@link FileSystem} files within buckets. This is integrated with the checkpointing mechanism to provide exactly once
* semantics.
*
*
*
When creating the sink a {@code basePath} must be specified. The base directory contains
* one directory for every bucket. The bucket directories themselves contain several part files, one for each parallel subtask of the sink. These part files
* contain the actual output data.
*
*
*
The sink uses a {@link Bucketer} to determine in which bucket directory each element should
* be written to inside the base directory. The {@code Bucketer} can, for example, use time or a property of the element to determine the bucket directory. The
* default {@code Bucketer} is a {@link org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer} which will create one new bucket every hour. You
* can specify a custom {@code Bucketer} using {@link #setBucketer(Bucketer)}. For example, use the {@link BasePathBucketer} if you don't want to have buckets
* but still want to write part-files in a fault-tolerant way.
*
*
*
The filenames of the part files contain the part prefix, the parallel subtask index of the sink
* and a rolling counter. For example the file {@code "part-1-17"} contains the data from {@code subtask 1} of the sink and is the {@code 17th} bucket created
* by that subtask. Per default the part prefix is {@code "part"} but this can be configured using {@link #setPartPrefix(String)}. When a part file becomes
* bigger than the user-specified batch size the current part file is closed, the part counter is increased and a new part file is created. The batch size
* defaults to {@code 384MB}, this can be configured using {@link #setBatchSize(long)}.
*
*
*
In some scenarios, the open buckets are required to change based on time. In these cases, the sink
* needs to determine when a bucket has become inactive, in order to flush and close the part file. To support this there are two configurable settings:
*
* - the frequency to check for inactive buckets, configured by {@link #setInactiveBucketCheckInterval(long)},
* and
* - the minimum amount of time a bucket has to not receive any data before it is considered inactive,
* configured by {@link #setInactiveBucketThreshold(long)}
*
* Both of these parameters default to {@code 60, 000 ms}, or {@code 1 min}.
*
*
* Part files can be in one of three states: {@code in-progress}, {@code pending} or {@code finished}.
* The reason for this is how the sink works together with the checkpointing mechanism to provide exactly-once semantics and fault-tolerance. The part file that
* is currently being written to is {@code in-progress}. Once a part file is closed for writing it becomes {@code pending}. When a checkpoint is successful the
* currently pending files will be moved to {@code finished}.
*
*
*
If case of a failure, and in order to guarantee exactly-once semantics, the sink should roll back to the state it
* had when that last successful checkpoint occurred. To this end, when restoring, the restored files in {@code pending} state are transferred into the {@code
* finished} state while any {@code in-progress} files are rolled back, so that they do not contain data that arrived after the checkpoint from which we
* restore. If the {@code FileSystem} supports the {@code truncate()} method this will be used to reset the file back to its previous state. If not, a special
* file with the same name as the part file and the suffix {@code ".valid-length"} will be created that contains the length up to which the file contains valid
* data. When reading the file, it must be ensured that it is only read up to that point. The prefixes and suffixes for the different file states and
* valid-length files can be configured using the adequate setter method, e.g. {@link #setPendingSuffix(String)}.
*
*
*
NOTE:
*
* -
* If checkpointing is not enabled the pending files will never be moved to the finished state. In that case, the pending suffix/prefix can be set to {@code ""}
* to make the sink work in a non-fault-tolerant way but still provide output without prefixes and suffixes.
*
* -
* The part files are written using an instance of {@link Writer}. By default, a {@link StringWriter} is used, which writes the result of {@code toString()} for
* every element, separated by newlines. You can configure the writer using the {@link #setWriter(Writer)}. For example, {@link SequenceFileWriter} can be used
* to write Hadoop {@code SequenceFiles}.
*
*
*
*
* Example:
*
{@code
* new BucketingSink>(outPath)
* .setWriter(new SequenceFileWriter())
* .setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm")
* }
*
* This will create a sink that writes to {@code SequenceFiles} and rolls every minute.
*
* @param Type of the elements emitted by this sink
* @see org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer
* @see StringWriter
* @see SequenceFileWriter
*/
public class CustomBucketingSink
extends RichSinkFunction
implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener, ProcessingTimeCallback {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(CustomBucketingSink.class);
// --------------------------------------------------------------------------------------------
// User configuration values
// --------------------------------------------------------------------------------------------
// These are initialized with some defaults but are meant to be changeable by the user
/**
* The default maximum size of part files (currently {@code 384 MB}).
*/
private static final long DEFAULT_BATCH_SIZE = 1024L * 1024L * 384L;
/**
* The default time between checks for inactive buckets. By default, {60 sec}.
*/
private static final long DEFAULT_INACTIVE_BUCKET_CHECK_INTERVAL_MS = 60 * 1000L;
/**
* The default threshold (in {@code ms}) for marking a bucket as inactive and closing its part files. By default, {60 sec}.
*/
private static final long DEFAULT_INACTIVE_BUCKET_THRESHOLD_MS = 60 * 1000L;
/**
* The suffix for {@code in-progress} part files. These are files we are currently writing to, but which were not yet confirmed by a checkpoint.
*/
private static final String DEFAULT_IN_PROGRESS_SUFFIX = ".in-progress";
/**
* The prefix for {@code in-progress} part files. These are files we are currently writing to, but which were not yet confirmed by a checkpoint.
*/
private static final String DEFAULT_IN_PROGRESS_PREFIX = "_";
/**
* The suffix for {@code pending} part files. These are closed files that we are not currently writing to (inactive or reached {@link #batchSize}), but which
* were not yet confirmed by a checkpoint.
*/
private static final String DEFAULT_PENDING_SUFFIX = ".pending";
/**
* The prefix for {@code pending} part files. These are closed files that we are not currently writing to (inactive or reached {@link #batchSize}), but which
* were not yet confirmed by a checkpoint.
*/
private static final String DEFAULT_PENDING_PREFIX = "_";
/**
* When {@code truncate()} is not supported by the used {@link FileSystem}, we create a file along the part file with this suffix that contains the length up
* to which the part file is valid.
*/
private static final String DEFAULT_VALID_SUFFIX = ".valid-length";
/**
* When {@code truncate()} is not supported by the used {@link FileSystem}, we create a file along the part file with this preffix that contains the length up
* to which the part file is valid.
*/
private static final String DEFAULT_VALID_PREFIX = "_";
/**
* The default prefix for part files.
*/
private static final String DEFAULT_PART_REFIX = "part";
/**
* The default timeout for asynchronous operations such as recoverLease and truncate (in {@code ms}).
*/
private static final long DEFAULT_ASYNC_TIMEOUT_MS = 60 * 1000;
/**
* The base {@code Path} that stores all bucket directories.
*/
private final String basePath;
/**
* The {@code Bucketer} that is used to determine the path of bucket directories.
*/
private Bucketer bucketer;
/**
* We have a template and call duplicate() for each parallel writer in open() to get the actual writer that is used for the part files.
*/
private Writer writerTemplate;
private long batchSize = DEFAULT_BATCH_SIZE;
private long inactiveBucketCheckInterval = DEFAULT_INACTIVE_BUCKET_CHECK_INTERVAL_MS;
private long inactiveBucketThreshold = DEFAULT_INACTIVE_BUCKET_THRESHOLD_MS;
// These are the actually configured prefixes/suffixes
private String inProgressSuffix = DEFAULT_IN_PROGRESS_SUFFIX;
private String inProgressPrefix = DEFAULT_IN_PROGRESS_PREFIX;
private String pendingSuffix = DEFAULT_PENDING_SUFFIX;
private String pendingPrefix = DEFAULT_PENDING_PREFIX;
private String validLengthSuffix = DEFAULT_VALID_SUFFIX;
private String validLengthPrefix = DEFAULT_VALID_PREFIX;
private String partPrefix = DEFAULT_PART_REFIX;
/**
* The timeout for asynchronous operations such as recoverLease and truncate (in {@code ms}).
*/
private long asyncTimeout = DEFAULT_ASYNC_TIMEOUT_MS;
// --------------------------------------------------------------------------------------------
// Internal fields (not configurable by user)
// -------------------------------------------ยง-------------------------------------------------
/**
* We use reflection to get the .truncate() method, this is only available starting with Hadoop 2.7 .
*/
private transient Method refTruncate;
/**
* The state object that is handled by Flink from snapshot/restore. This contains state for every open bucket: the current in-progress part file path, its
* valid length and the pending part files.
*/
private transient State state;
private transient ListState> restoredBucketStates;
/**
* User-defined FileSystem parameters.
*/
@Nullable
private Configuration fsConfig;
/**
* The FileSystem reference.
*/
private transient FileSystem fs;
private transient Clock clock;
private transient ProcessingTimeService processingTimeService;
private boolean enableTimestampPrefix = true;
/**
* Creates a new {@code BucketingSink} that writes files to the given base directory.
*
*
* This uses a{@link org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer} as {@link Bucketer} and a {@link StringWriter} has writer.
* The maximum bucket size is set to 384 MB.
*
* @param basePath The directory to which to write the bucket files.
*/
public CustomBucketingSink(String basePath) {
this.basePath = basePath;
this.bucketer = new DateTimeBucketer<>();
this.writerTemplate = new StringWriter<>();
}
public static FileSystem createHadoopFileSystem(
Path path,
@Nullable Configuration extraUserConf) throws IOException {
// try to get the Hadoop File System via the Flink File Systems
// that way we get the proper configuration
final org.apache.flink.core.fs.FileSystem flinkFs = org.apache.flink.core.fs.FileSystem.get(path.toUri());
final FileSystem hadoopFs = (flinkFs instanceof HadoopFileSystem) ?
((HadoopFileSystem) flinkFs).getHadoopFileSystem() : null;
// fast path: if the Flink file system wraps Hadoop anyways and we need no extra config,
// then we use it directly
if (extraUserConf == null && hadoopFs != null) {
return hadoopFs;
} else {
// we need to re-instantiate the Hadoop file system, because we either have
// a special config, or the Path gave us a Flink FS that is not backed by
// Hadoop (like file://)
final org.apache.hadoop.conf.Configuration hadoopConf;
if (hadoopFs != null) {
// have a Hadoop FS but need to apply extra config
hadoopConf = hadoopFs.getConf();
} else {
// the Path gave us a Flink FS that is not backed by Hadoop (like file://)
// we need to get access to the Hadoop file system first
// we access the Hadoop FS in Flink, which carries the proper
// Hadoop configuration. we should get rid of this once the bucketing sink is
// properly implemented against Flink's FS abstraction
URI genericHdfsUri = URI.create("hdfs://localhost:12345/");
org.apache.flink.core.fs.FileSystem accessor =
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(genericHdfsUri);
if (!(accessor instanceof HadoopFileSystem)) {
throw new IOException(
"Cannot instantiate a Hadoop file system to access the Hadoop configuration. " +
"FS for hdfs:// is " + accessor.getClass().getName());
}
hadoopConf = ((HadoopFileSystem) accessor).getHadoopFileSystem().getConf();
}
// finalize the configuration
final org.apache.hadoop.conf.Configuration finalConf;
if (extraUserConf == null) {
finalConf = hadoopConf;
} else {
finalConf = new org.apache.hadoop.conf.Configuration(hadoopConf);
for (String key : extraUserConf.keySet()) {
finalConf.set(key, extraUserConf.getString(key, null));
}
}
// we explicitly re-instantiate the file system here in order to make sure
// that the configuration is applied.
URI fsUri = path.toUri();
final String scheme = fsUri.getScheme();
final String authority = fsUri.getAuthority();
if (scheme == null && authority == null) {
fsUri = FileSystem.getDefaultUri(finalConf);
} else if (scheme != null && authority == null) {
URI defaultUri = FileSystem.getDefaultUri(finalConf);
if (scheme.equals(defaultUri.getScheme()) && defaultUri.getAuthority() != null) {
fsUri = defaultUri;
}
}
final Class extends FileSystem> fsClass = FileSystem.getFileSystemClass(fsUri.getScheme(), finalConf);
final FileSystem fs;
try {
fs = fsClass.newInstance();
} catch (Exception e) {
throw new IOException("Cannot instantiate the Hadoop file system", e);
}
fs.initialize(fsUri, finalConf);
return fs;
}
}
/**
* Specify a custom {@code Configuration} that will be used when creating the {@link FileSystem} for writing.
*/
public CustomBucketingSink setFSConfig(Configuration config) {
this.fsConfig = new Configuration();
fsConfig.addAll(config);
return this;
}
/**
* Specify a custom {@code Configuration} that will be used when creating the {@link FileSystem} for writing.
*/
public CustomBucketingSink setFSConfig(org.apache.hadoop.conf.Configuration config) {
this.fsConfig = new Configuration();
for (Map.Entry entry : config) {
fsConfig.setString(entry.getKey(), entry.getValue());
}
return this;
}
@Override
@SuppressWarnings("unchecked")
public void setInputType(TypeInformation> type, ExecutionConfig executionConfig) {
if (this.writerTemplate instanceof InputTypeConfigurable) {
((InputTypeConfigurable) writerTemplate).setInputType(type, executionConfig);
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
Preconditions.checkArgument(this.restoredBucketStates == null, "The operator has already been initialized.");
try {
initFileSystem();
} catch (IOException e) {
LOG.error("Error while creating FileSystem when initializing the state of the BucketingSink.", e);
throw new RuntimeException("Error while creating FileSystem when initializing the state of the BucketingSink.", e);
}
if (this.refTruncate == null) {
this.refTruncate = reflectTruncate(fs);
}
OperatorStateStore stateStore = context.getOperatorStateStore();
restoredBucketStates = stateStore.getSerializableListState("bucket-states");
int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
if (context.isRestored()) {
LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex);
for (State recoveredState : restoredBucketStates.get()) {
handleRestoredBucketState(recoveredState);
if (LOG.isDebugEnabled()) {
LOG.debug("{} idx {} restored {}", getClass().getSimpleName(), subtaskIndex, recoveredState);
}
}
} else {
LOG.info("No state to restore for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex);
}
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
state = new State<>();
processingTimeService =
((StreamingRuntimeContext) getRuntimeContext()).getProcessingTimeService();
long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
processingTimeService.registerTimer(currentProcessingTime + inactiveBucketCheckInterval, this);
this.clock = new Clock() {
@Override
public long currentTimeMillis() {
return processingTimeService.getCurrentProcessingTime();
}
};
}
/**
* Create a file system with the user-defined {@code HDFS} configuration.
*/
private void initFileSystem() throws IOException {
if (fs == null) {
Path path = new Path(basePath);
fs = createHadoopFileSystem(path, fsConfig);
}
}
@Override
public void close() throws Exception {
if (state != null) {
for (Map.Entry> entry : state.bucketStates.entrySet()) {
closeCurrentPartFile(entry.getValue());
}
}
}
@Override
public void invoke(T value) throws Exception {
// Added by Sohi
final String counterName = this.getRuntimeContext().getTaskNameWithSubtasks();
FlinkUtil.incrementCounter(counterName, this.getRuntimeContext());
Path bucketPath = bucketer.getBucketPath(clock, new Path(basePath), value);
long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
BucketState bucketState = state.getBucketState(bucketPath);
if (bucketState == null) {
bucketState = new BucketState<>(currentProcessingTime);
state.addBucketState(bucketPath, bucketState);
}
if (shouldRoll(bucketState)) {
openNewPartFile(bucketPath, bucketState);
}
bucketState.writer.write(value);
bucketState.lastWrittenToTime = currentProcessingTime;
}
/**
* Returns {@code true} if the current {@code part-file} should be closed and a new should be created. This happens if:
*
* - no file is created yet for the task to write to, or
* - the current file has reached the maximum bucket size.
*
*/
private boolean shouldRoll(BucketState bucketState) throws IOException {
boolean shouldRoll = false;
int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
if (!bucketState.isWriterOpen) {
shouldRoll = true;
LOG.debug("BucketingSink {} starting new bucket.", subtaskIndex);
} else {
long writePosition = bucketState.writer.getPos();
if (writePosition > batchSize) {
shouldRoll = true;
LOG.debug(
"BucketingSink {} starting new bucket because file position {} is above batch size {}.",
subtaskIndex,
writePosition,
batchSize);
}
}
return shouldRoll;
}
@Override
public void onProcessingTime(long timestamp) throws Exception {
long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
checkForInactiveBuckets(currentProcessingTime);
processingTimeService.registerTimer(currentProcessingTime + inactiveBucketCheckInterval, this);
}
/**
* Checks for inactive buckets, and closes them. Buckets are considered inactive if they have not been written to for a period greater than {@code
* inactiveBucketThreshold} ms. This enables in-progress files to be moved to the pending state and be finalised on the next checkpoint.
*/
private void checkForInactiveBuckets(long currentProcessingTime) throws Exception {
synchronized (state.bucketStates) {
for (Map.Entry> entry : state.bucketStates.entrySet()) {
if (entry.getValue().lastWrittenToTime < currentProcessingTime - inactiveBucketThreshold) {
LOG.debug("BucketingSink {} closing bucket due to inactivity of over {} ms.",
getRuntimeContext().getIndexOfThisSubtask(), inactiveBucketThreshold);
closeCurrentPartFile(entry.getValue());
}
}
}
}
/**
* Closes the current part file and opens a new one with a new bucket path, as returned by the {@link Bucketer}. If the bucket is not new, then this will
* create a new file with the same path as its predecessor, but with an increased rolling counter (see {@link CustomBucketingSink}.
*/
private void openNewPartFile(Path bucketPath, BucketState bucketState) throws Exception {
closeCurrentPartFile(bucketState);
if (!fs.exists(bucketPath)) {
try {
if (fs.mkdirs(bucketPath)) {
LOG.debug("Created new bucket directory: {}", bucketPath);
}
} catch (IOException e) {
throw new RuntimeException("Could not create new bucket path.", e);
}
}
// The following loop tries different partCounter values in ascending order until it reaches the minimum
// that is not yet used. This works since there is only one parallel subtask that tries names with this
// subtask id. Otherwise we would run into concurrency issues here. This is aligned with the way we now
// clean the base directory in case of rescaling.
int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
// Added by sohi
Path partPath = new Path(bucketPath, getChildName(bucketState, subtaskIndex));
while (fs.exists(partPath) ||
fs.exists(getPendingPathFor(partPath)) ||
fs.exists(getInProgressPathFor(partPath))) {
bucketState.partCounter++;
// Added by sohi
partPath = new Path(bucketPath, getChildName(bucketState, subtaskIndex));
}
// increase, so we don't have to check for this name next time
bucketState.partCounter++;
LOG.info("Next part path is {}", partPath.toString());
bucketState.currentFile = partPath.toString();
Path inProgressPath = getInProgressPathFor(partPath);
if (bucketState.writer == null) {
bucketState.writer = writerTemplate.duplicate();
}
bucketState.writer.open(fs, inProgressPath);
bucketState.isWriterOpen = true;
}
// Added by sohi
private String getChildName(BucketState bucketState, int subtaskIndex) {
String timeStr = "";
if (this.enableTimestampPrefix) {
timeStr = "-" + System.currentTimeMillis();
}
return partPrefix + timeStr + "-" + subtaskIndex + "-" + bucketState.partCounter;
}
/**
* Closes the current part file and moves it from the in-progress state to the pending state.
*/
private void closeCurrentPartFile(BucketState bucketState) throws Exception {
if (bucketState.isWriterOpen) {
bucketState.writer.close();
bucketState.isWriterOpen = false;
}
if (bucketState.currentFile != null) {
Path currentPartPath = new Path(bucketState.currentFile);
Path inProgressPath = getInProgressPathFor(currentPartPath);
Path pendingPath = getPendingPathFor(currentPartPath);
fs.rename(inProgressPath, pendingPath);
LOG.debug("Moving in-progress bucket {} to pending file {}",
inProgressPath,
pendingPath);
bucketState.pendingFiles.add(currentPartPath.toString());
bucketState.currentFile = null;
}
}
/**
* Gets the truncate() call using reflection.
*
* NOTE: This code comes from Flume.
*/
private Method reflectTruncate(FileSystem fs) {
Method m = null;
if (fs != null) {
Class> fsClass = fs.getClass();
try {
m = fsClass.getMethod("truncate", Path.class, long.class);
} catch (NoSuchMethodException ex) {
LOG.debug("Truncate not found. Will write a file with suffix '{}' " +
" and prefix '{}' to specify how many bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix);
return null;
}
// verify that truncate actually works
FSDataOutputStream outputStream;
Path testPath = new Path(UUID.randomUUID().toString());
try {
outputStream = fs.create(testPath);
outputStream.writeUTF("hello");
outputStream.close();
} catch (IOException e) {
LOG.error("Could not create file for checking if truncate works.", e);
throw new RuntimeException("Could not create file for checking if truncate works.", e);
}
try {
m.invoke(fs, testPath, 2);
} catch (IllegalAccessException | InvocationTargetException e) {
LOG.debug("Truncate is not supported.", e);
m = null;
}
try {
fs.delete(testPath, false);
} catch (IOException e) {
LOG.error("Could not delete truncate test file.", e);
throw new RuntimeException("Could not delete truncate test file.", e);
}
}
return m;
}
private Path getPendingPathFor(Path path) {
return new Path(path.getParent(), pendingPrefix + path.getName()).suffix(pendingSuffix);
}
private Path getInProgressPathFor(Path path) {
return new Path(path.getParent(), inProgressPrefix + path.getName()).suffix(inProgressSuffix);
}
private Path getValidLengthPathFor(Path path) {
return new Path(path.getParent(), validLengthPrefix + path.getName()).suffix(validLengthSuffix);
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
synchronized (state.bucketStates) {
Iterator>> bucketStatesIt = state.bucketStates.entrySet().iterator();
while (bucketStatesIt.hasNext()) {
BucketState bucketState = bucketStatesIt.next().getValue();
synchronized (bucketState.pendingFilesPerCheckpoint) {
Iterator>> pendingCheckpointsIt =
bucketState.pendingFilesPerCheckpoint.entrySet().iterator();
while (pendingCheckpointsIt.hasNext()) {
Map.Entry> entry = pendingCheckpointsIt.next();
Long pastCheckpointId = entry.getKey();
List pendingPaths = entry.getValue();
if (pastCheckpointId <= checkpointId) {
LOG.debug("Moving pending files to final location for checkpoint {}", pastCheckpointId);
for (String filename : pendingPaths) {
Path finalPath = new Path(filename);
Path pendingPath = getPendingPathFor(finalPath);
fs.rename(pendingPath, finalPath);
LOG.debug(
"Moving pending file {} to final location having completed checkpoint {}.",
pendingPath,
pastCheckpointId);
}
pendingCheckpointsIt.remove();
}
}
if (!bucketState.isWriterOpen &&
bucketState.pendingFiles.isEmpty() &&
bucketState.pendingFilesPerCheckpoint.isEmpty()) {
// We've dealt with all the pending files and the writer for this bucket is not currently open.
// Therefore this bucket is currently inactive and we can remove it from our state.
bucketStatesIt.remove();
}
}
}
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
Preconditions.checkNotNull(restoredBucketStates, "The operator has not been properly initialized.");
restoredBucketStates.clear();
synchronized (state.bucketStates) {
int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
for (Map.Entry> bucketStateEntry : state.bucketStates.entrySet()) {
BucketState bucketState = bucketStateEntry.getValue();
if (bucketState.isWriterOpen) {
bucketState.currentFileValidLength = bucketState.writer.flush();
}
synchronized (bucketState.pendingFilesPerCheckpoint) {
bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles);
}
bucketState.pendingFiles = new ArrayList<>();
}
restoredBucketStates.add(state);
if (LOG.isDebugEnabled()) {
LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(), subtaskIdx, state);
}
}
}
private void handleRestoredBucketState(State restoredState) {
Preconditions.checkNotNull(restoredState);
for (BucketState bucketState : restoredState.bucketStates.values()) {
// we can clean all the pending files since they were renamed to
// final files after this checkpoint was successful
// (we re-start from the last **successful** checkpoint)
bucketState.pendingFiles.clear();
handlePendingInProgressFile(bucketState.currentFile, bucketState.currentFileValidLength);
// Now that we've restored the bucket to a valid state, reset the current file info
bucketState.currentFile = null;
bucketState.currentFileValidLength = -1;
bucketState.isWriterOpen = false;
handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
bucketState.pendingFilesPerCheckpoint.clear();
}
}
private void handleRestoredRollingSinkState(RollingSink.BucketState restoredState) {
restoredState.pendingFiles.clear();
handlePendingInProgressFile(restoredState.currentFile, restoredState.currentFileValidLength);
// Now that we've restored the bucket to a valid state, reset the current file info
restoredState.currentFile = null;
restoredState.currentFileValidLength = -1;
handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);
restoredState.pendingFilesPerCheckpoint.clear();
}
private void handlePendingInProgressFile(String file, long validLength) {
if (file != null) {
// We were writing to a file when the last checkpoint occurred. This file can either
// be still in-progress or became a pending file at some point after the checkpoint.
// Either way, we have to truncate it back to a valid state (or write a .valid-length
// file that specifies up to which length it is valid) and rename it to the final name
// before starting a new bucket file.
Path partPath = new Path(file);
try {
Path partPendingPath = getPendingPathFor(partPath);
Path partInProgressPath = getInProgressPathFor(partPath);
if (fs.exists(partPendingPath)) {
LOG.debug("In-progress file {} has been moved to pending after checkpoint, moving to final location.", partPath);
// has been moved to pending in the mean time, rename to final location
fs.rename(partPendingPath, partPath);
} else if (fs.exists(partInProgressPath)) {
LOG.debug("In-progress file {} is still in-progress, moving to final location.", partPath);
// it was still in progress, rename to final path
fs.rename(partInProgressPath, partPath);
} else if (fs.exists(partPath)) {
LOG.debug("In-Progress file {} was already moved to final location {}.", file, partPath);
} else {
LOG.debug("In-Progress file {} was neither moved to pending nor is still in progress. Possibly, " +
"it was moved to final location by a previous snapshot restore", file);
}
// We use reflection to get the .truncate() method, this
// is only available starting with Hadoop 2.7
if (this.refTruncate == null) {
this.refTruncate = reflectTruncate(fs);
}
// truncate it or write a ".valid-length" file to specify up to which point it is valid
if (refTruncate != null) {
LOG.debug("Truncating {} to valid length {}", partPath, validLength);
// some-one else might still hold the lease from a previous try, we are
// recovering, after all ...
if (fs instanceof DistributedFileSystem) {
DistributedFileSystem dfs = (DistributedFileSystem) fs;
LOG.debug("Trying to recover file lease {}", partPath);
dfs.recoverLease(partPath);
boolean isclosed = dfs.isFileClosed(partPath);
StopWatch sw = new StopWatch();
sw.start();
while (!isclosed) {
if (sw.getTime() > asyncTimeout) {
break;
}
try {
Thread.sleep(500);
} catch (InterruptedException e1) {
// ignore it
}
isclosed = dfs.isFileClosed(partPath);
}
}
Boolean truncated = (Boolean) refTruncate.invoke(fs, partPath, validLength);
if (!truncated) {
LOG.debug("Truncate did not immediately complete for {}, waiting...", partPath);
// we must wait for the asynchronous truncate operation to complete
StopWatch sw = new StopWatch();
sw.start();
long newLen = fs.getFileStatus(partPath).getLen();
while (newLen != validLength) {
if (sw.getTime() > asyncTimeout) {
break;
}
try {
Thread.sleep(500);
} catch (InterruptedException e1) {
// ignore it
}
newLen = fs.getFileStatus(partPath).getLen();
}
if (newLen != validLength) {
throw new RuntimeException("Truncate did not truncate to right length. Should be " + validLength + " is " + newLen + ".");
}
}
} else {
// Added by sohi
LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, validLength);
Path validLengthFilePath = getValidLengthPathFor(partPath);
if (!fs.exists(validLengthFilePath) && fs.exists(partPath)) {
FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
lengthFileOut.writeUTF(Long.toString(validLength));
lengthFileOut.close();
}
}
} catch (IOException e) {
LOG.error("Error while restoring BucketingSink state.", e);
throw new RuntimeException("Error while restoring BucketingSink state.", e);
} catch (InvocationTargetException | IllegalAccessException e) {
LOG.error("Could not invoke truncate.", e);
throw new RuntimeException("Could not invoke truncate.", e);
}
}
}
// --------------------------------------------------------------------------------------------
// Setters for User configuration values
// --------------------------------------------------------------------------------------------
private void handlePendingFilesForPreviousCheckpoints(Map> pendingFilesPerCheckpoint) {
// Move files that are confirmed by a checkpoint but did not get moved to final location
// because the checkpoint notification did not happen before a failure
LOG.debug("Moving pending files to final location on restore.");
Set pastCheckpointIds = pendingFilesPerCheckpoint.keySet();
for (Long pastCheckpointId : pastCheckpointIds) {
// All the pending files are buckets that have been completed but are waiting to be renamed
// to their final name
for (String filename : pendingFilesPerCheckpoint.get(pastCheckpointId)) {
Path finalPath = new Path(filename);
Path pendingPath = getPendingPathFor(finalPath);
try {
if (fs.exists(pendingPath)) {
LOG.debug("Restoring BucketingSink State: Moving pending file {} to final location after complete checkpoint {}.", pendingPath, pastCheckpointId);
fs.rename(pendingPath, finalPath);
}
} catch (IOException e) {
LOG.error("Restoring BucketingSink State: Error while renaming pending file {} to final path {}: {}", pendingPath, finalPath, e);
throw new RuntimeException("Error while renaming pending file " + pendingPath + " to final path " + finalPath, e);
}
}
}
}
/**
* Sets the maximum bucket size in bytes.
*
*
* When a bucket part file becomes larger than this size a new bucket part file is started and
* the old one is closed. The name of the bucket files depends on the {@link Bucketer}.
*
* @param batchSize The bucket part file size in bytes.
*/
public CustomBucketingSink setBatchSize(long batchSize) {
this.batchSize = batchSize;
return this;
}
/**
* Sets the default time between checks for inactive buckets.
*
* @param interval The timeout, in milliseconds.
*/
public CustomBucketingSink setInactiveBucketCheckInterval(long interval) {
this.inactiveBucketCheckInterval = interval;
return this;
}
/**
* Sets the default threshold for marking a bucket as inactive and closing its part files. Buckets which haven't been written to for at least this period of
* time become inactive.
*
* @param threshold The timeout, in milliseconds.
*/
public CustomBucketingSink setInactiveBucketThreshold(long threshold) {
this.inactiveBucketThreshold = threshold;
return this;
}
/**
* Sets the {@link Bucketer} to use for determining the bucket files to write to.
*
* @param bucketer The bucketer to use.
*/
public CustomBucketingSink setBucketer(Bucketer bucketer) {
this.bucketer = bucketer;
return this;
}
/**
* Sets the {@link Writer} to be used for writing the incoming elements to bucket files.
*
* @param writer The {@code Writer} to use.
*/
public CustomBucketingSink setWriter(Writer writer) {
this.writerTemplate = writer;
return this;
}
/**
* Sets the suffix of in-progress part files. The default is {@code "in-progress"}.
*/
public CustomBucketingSink setInProgressSuffix(String inProgressSuffix) {
this.inProgressSuffix = inProgressSuffix;
return this;
}
/**
* Sets the prefix of in-progress part files. The default is {@code "_"}.
*/
public CustomBucketingSink setInProgressPrefix(String inProgressPrefix) {
this.inProgressPrefix = inProgressPrefix;
return this;
}
/**
* Sets the suffix of pending part files. The default is {@code ".pending"}.
*/
public CustomBucketingSink setPendingSuffix(String pendingSuffix) {
this.pendingSuffix = pendingSuffix;
return this;
}
/**
* Sets the prefix of pending part files. The default is {@code "_"}.
*/
public CustomBucketingSink setPendingPrefix(String pendingPrefix) {
this.pendingPrefix = pendingPrefix;
return this;
}
/**
* Sets the suffix of valid-length files. The default is {@code ".valid-length"}.
*/
public CustomBucketingSink setValidLengthSuffix(String validLengthSuffix) {
this.validLengthSuffix = validLengthSuffix;
return this;
}
/**
* Sets the prefix of valid-length files. The default is {@code "_"}.
*/
public CustomBucketingSink setValidLengthPrefix(String validLengthPrefix) {
this.validLengthPrefix = validLengthPrefix;
return this;
}
/**
* Sets the prefix of part files. The default is {@code "part"}.
*/
public CustomBucketingSink setPartPrefix(String partPrefix) {
this.partPrefix = partPrefix;
return this;
}
/**
* Disable cleanup of leftover in-progress/pending files when the sink is opened.
*
*
* This should only be disabled if using the sink without checkpoints, to not remove
* the files already in the directory.
*
* @deprecated This option is deprecated and remains only for backwards compatibility. We do not clean up lingering files anymore.
*/
@Deprecated
public CustomBucketingSink disableCleanupOnOpen() {
return this;
}
/**
* Sets the default timeout for asynchronous operations such as recoverLease and truncate.
*
* @param timeout The timeout, in milliseconds.
*/
public CustomBucketingSink setAsyncTimeout(long timeout) {
this.asyncTimeout = timeout;
return this;
}
@VisibleForTesting
public State getState() {
return state;
}
// --------------------------------------------------------------------------------------------
// Internal Classes
// --------------------------------------------------------------------------------------------
// Added by sohi
public CustomBucketingSink disableTimestampPrefix() {
this.enableTimestampPrefix = false;
return this;
}
/**
* This is used during snapshot/restore to keep track of in-progress buckets. For each bucket, we maintain a state.
*/
static final class State implements Serializable {
private static final long serialVersionUID = 1L;
/**
* For every bucket directory (key), we maintain a bucket state (value).
*/
final Map> bucketStates = new HashMap<>();
void addBucketState(Path bucketPath, BucketState state) {
synchronized (bucketStates) {
bucketStates.put(bucketPath.toString(), state);
}
}
BucketState getBucketState(Path bucketPath) {
synchronized (bucketStates) {
return bucketStates.get(bucketPath.toString());
}
}
@Override
public String toString() {
return bucketStates.toString();
}
}
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
/**
* This is used for keeping track of the current in-progress buckets and files that we mark for moving from pending to final location after we get a
* checkpoint-complete notification.
*/
static final class BucketState implements Serializable {
private static final long serialVersionUID = 1L;
/**
* When doing a checkpoint we move the pending files since the last checkpoint to this map with the id of the checkpoint. When we get the
* checkpoint-complete notification we move pending files of completed checkpoints to their final location.
*/
final Map> pendingFilesPerCheckpoint = new HashMap<>();
/**
* The file that was in-progress when the last checkpoint occurred.
*/
String currentFile;
/**
* The valid length of the in-progress file at the time of the last checkpoint.
*/
long currentFileValidLength = -1;
/**
* The time this bucket was last written to.
*/
long lastWrittenToTime;
/**
* Pending files that accumulated since the last checkpoint.
*/
List pendingFiles = new ArrayList<>();
/**
* For counting the part files inside a bucket directory. Part files follow the pattern {@code "{part-prefix}-{subtask}-{count}"}. When creating new part
* files we increase the counter.
*/
private transient int partCounter;
/**
* Tracks if the writer is currently opened or closed.
*/
private transient boolean isWriterOpen;
/**
* The actual writer that we user for writing the part files.
*/
private transient Writer writer;
BucketState(long lastWrittenToTime) {
this.lastWrittenToTime = lastWrittenToTime;
}
@Override
public String toString() {
return
"In-progress=" + currentFile +
" validLength=" + currentFileValidLength +
" pendingForNextCheckpoint=" + pendingFiles +
" pendingForPrevCheckpoints=" + pendingFilesPerCheckpoint +
" lastModified@" + lastWrittenToTime;
}
}
}