/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package de.helaba.rtts.ice.rocksdb.ttl; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.AggregatingStateDescriptor; import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.core.memory.ByteArrayDataInputView; import org.apache.flink.core.memory.ByteArrayDataOutputView; import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.CheckpointType; import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources; import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider; import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.DirectoryStateHandle; import org.apache.flink.runtime.state.DoneFuture; import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle; import org.apache.flink.runtime.state.KeyExtractorFunction; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.Keyed; import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider; import org.apache.flink.runtime.state.PlaceholderStreamStateHandle; import org.apache.flink.runtime.state.PriorityComparable; import org.apache.flink.runtime.state.PriorityComparator; import org.apache.flink.runtime.state.PriorityQueueSetFactory; import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo; import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo; import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator; import org.apache.flink.runtime.state.SnapshotDirectory; import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.runtime.state.SnapshotStrategy; import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StateSnapshotTransformer; import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory; import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.runtime.state.StreamCompressionDecorator; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator; import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue; import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FileUtils; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.ResourceGuard; import org.apache.flink.util.StateMigrationException; import org.apache.flink.util.function.SupplierWithException; import org.rocksdb.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.StandardCopyOption; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; import java.util.Optional; import java.util.PriorityQueue; import java.util.Set; import java.util.SortedMap; import java.util.Spliterator; import java.util.Spliterators; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.FutureTask; import java.util.concurrent.RunnableFuture; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; /** * An {@link AbstractKeyedStateBackend} that stores its state in {@code RocksDB} and serializes state to * streams provided by a {@link CheckpointStreamFactory} upon * checkpointing. This state backend can store very large state that exceeds memory and spills * to disk. Except for the snapshotting, this class should be accessed as if it is not threadsafe. * * <p>This class follows the rules for closing/releasing native RocksDB resources as described in + <a href="https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families"> * this document</a>. */ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateBackend.class); /** The name of the merge operator in RocksDB. Do not change except you know exactly what you do. */ public static final String MERGE_OPERATOR_NAME = "stringappendtest"; /** File suffix of sstable files. */ private static final String SST_FILE_SUFFIX = ".sst"; private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES = Stream.of( Tuple2.of(ValueStateDescriptor.class, (StateFactory) RocksDBValueState::create), Tuple2.of(ListStateDescriptor.class, (StateFactory) RocksDBListState::create), Tuple2.of(MapStateDescriptor.class, (StateFactory) RocksDBMapState::create), Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) RocksDBAggregatingState::create), Tuple2.of(ReducingStateDescriptor.class, (StateFactory) RocksDBReducingState::create), Tuple2.of(FoldingStateDescriptor.class, (StateFactory) RocksDBFoldingState::create) ).collect(Collectors.toMap(t -> t.f0, t -> t.f1)); private interface StateFactory { <K, N, SV, S extends State, IS extends S> IS createState( StateDescriptor<S, SV> stateDesc, Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult, RocksDBKeyedStateBackend<K> backend) throws Exception; } /** String that identifies the operator that owns this backend. */ private final String operatorIdentifier; /** The column family options from the options factory. */ private final ColumnFamilyOptions columnOptions; /** The DB options from the options factory. */ private final DBOptions dbOptions; /** Path where this configured instance stores its data directory. */ private final File instanceBasePath; /** Path where this configured instance stores its RocksDB database. */ private final File instanceRocksDBPath; /** * Protects access to RocksDB in other threads, like the checkpointing thread from parallel call that disposes the * RocksDb object. */ private final ResourceGuard rocksDBResourceGuard; /** * Our RocksDB database, this is used by the actual subclasses of {@link AbstractRocksDBState} * to store state. The different k/v states that we have don't each have their own RocksDB * instance. They all write to this instance but to their own column family. */ protected TtlDB db; /** * We are not using the default column family for Flink state ops, but we still need to remember this handle so that * we can close it properly when the backend is closed. This is required by RocksDB's native memory management. */ private ColumnFamilyHandle defaultColumnFamily; /** * The write options to use in the states. We disable write ahead logging. */ private final WriteOptions writeOptions; /** * Information about the k/v states as we create them. This is used to retrieve the * column family that is used for a state and also for sanity checks when restoring. */ private final Map<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> kvStateInformation; /** * Map of state names to their corresponding restored state meta info. * * <p>TODO this map can be removed when eager-state registration is in place. * TODO we currently need this cached to check state migration strategies when new serializers are registered. */ private final Map<String, StateMetaInfoSnapshot> restoredKvStateMetaInfos; /** Number of bytes required to prefix the key groups. */ private final int keyGroupPrefixBytes; /** True if incremental checkpointing is enabled. */ private final boolean enableIncrementalCheckpointing; /** The state handle ids of all sst files materialized in snapshots for previous checkpoints. */ private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles; /** The identifier of the last completed checkpoint. */ private long lastCompletedCheckpointId = -1L; /** Unique ID of this backend. */ private UUID backendUID; /** The configuration of local recovery. */ private final LocalRecoveryConfig localRecoveryConfig; /** The snapshot strategy, e.g., if we use full or incremental checkpoints, local state, and so on. */ private final SnapshotStrategy<SnapshotResult<KeyedStateHandle>> snapshotStrategy; /** Factory for priority queue state. */ private final PriorityQueueSetFactory priorityQueueFactory; /** Shared wrapper for batch writes to the RocksDB instance. */ private RocksDBWriteBatchWrapper writeBatchWrapper; private int ttl; public RocksDBKeyedStateBackend( String operatorIdentifier, ClassLoader userCodeClassLoader, File instanceBasePath, DBOptions dbOptions, ColumnFamilyOptions columnFamilyOptions, TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, ExecutionConfig executionConfig, boolean enableIncrementalCheckpointing, LocalRecoveryConfig localRecoveryConfig, RocksDBStateBackend.PriorityQueueStateType priorityQueueStateType, TtlTimeProvider ttlTimeProvider, int ttl ) throws IOException { super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig, ttlTimeProvider); this.ttl = ttl; this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier); this.enableIncrementalCheckpointing = enableIncrementalCheckpointing; this.rocksDBResourceGuard = new ResourceGuard(); // ensure that we use the right merge operator, because other code relies on this this.columnOptions = Preconditions.checkNotNull(columnFamilyOptions) .setMergeOperatorName(MERGE_OPERATOR_NAME); this.dbOptions = Preconditions.checkNotNull(dbOptions); this.instanceBasePath = Preconditions.checkNotNull(instanceBasePath); this.instanceRocksDBPath = new File(instanceBasePath, "db"); checkAndCreateDirectory(instanceBasePath); if (instanceRocksDBPath.exists()) { // Clear the base directory when the backend is created // in case something crashed and the backend never reached dispose() cleanInstanceBasePath(); } this.localRecoveryConfig = Preconditions.checkNotNull(localRecoveryConfig); this.keyGroupPrefixBytes = RocksDBKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(getNumberOfKeyGroups()); this.kvStateInformation = new LinkedHashMap<>(); this.restoredKvStateMetaInfos = new HashMap<>(); this.materializedSstFiles = new TreeMap<>(); this.backendUID = UUID.randomUUID(); this.snapshotStrategy = enableIncrementalCheckpointing ? new IncrementalSnapshotStrategy() : new FullSnapshotStrategy(); this.writeOptions = new WriteOptions().setDisableWAL(true); switch (priorityQueueStateType) { case HEAP: this.priorityQueueFactory = new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128); break; case ROCKSDB: this.priorityQueueFactory = new RocksDBPriorityQueueSetFactory(); break; default: throw new IllegalArgumentException("Unknown priority queue state type: " + priorityQueueStateType); } LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID); } private static void checkAndCreateDirectory(File directory) throws IOException { if (directory.exists()) { if (!directory.isDirectory()) { throw new IOException("Not a directory: " + directory); } } else { if (!directory.mkdirs()) { throw new IOException( String.format("Could not create RocksDB data directory at %s.", directory)); } } } @SuppressWarnings("unchecked") @Override public <N> Stream<K> getKeys(String state, N namespace) { Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> columnInfo = kvStateInformation.get(state); if (columnInfo == null || !(columnInfo.f1 instanceof RegisteredKeyValueStateBackendMetaInfo)) { return Stream.empty(); } RegisteredKeyValueStateBackendMetaInfo<N, ?> registeredKeyValueStateBackendMetaInfo = (RegisteredKeyValueStateBackendMetaInfo<N, ?>) columnInfo.f1; final TypeSerializer<N> namespaceSerializer = registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer(); final ByteArrayOutputStreamWithPos namespaceOutputStream = new ByteArrayOutputStreamWithPos(8); boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(keySerializer, namespaceSerializer); final byte[] nameSpaceBytes; try { RocksDBKeySerializationUtils.writeNameSpace( namespace, namespaceSerializer, namespaceOutputStream, new DataOutputViewStreamWrapper(namespaceOutputStream), ambiguousKeyPossible); nameSpaceBytes = namespaceOutputStream.toByteArray(); } catch (IOException ex) { throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex); } RocksIteratorWrapper iterator = getRocksIterator(db, columnInfo.f0); iterator.seekToFirst(); final RocksIteratorForKeysWrapper<K> iteratorWrapper = new RocksIteratorForKeysWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes, ambiguousKeyPossible, nameSpaceBytes); Stream<K> targetStream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(iteratorWrapper, Spliterator.ORDERED), false); return targetStream.onClose(iteratorWrapper::close); } @VisibleForTesting ColumnFamilyHandle getColumnFamilyHandle(String state) { Tuple2<ColumnFamilyHandle, ?> columnInfo = kvStateInformation.get(state); return columnInfo != null ? columnInfo.f0 : null; } /** * Should only be called by one thread, and only after all accesses to the DB happened. */ @Override public void dispose() { super.dispose(); // This call will block until all clients that still acquire access to the RocksDB instance have released it, // so that we cannot release the native resources while clients are still working with it in parallel. rocksDBResourceGuard.close(); // IMPORTANT: null reference to signal potential async checkpoint workers that the db was disposed, as // working on the disposed object results in SEGFAULTS. if (db != null) { IOUtils.closeQuietly(writeBatchWrapper); // RocksDB's native memory management requires that *all* CFs (including default) are closed before the // DB is closed. See: // https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families // Start with default CF ... IOUtils.closeQuietly(defaultColumnFamily); // ... continue with the ones created by Flink... for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> columnMetaData : kvStateInformation.values()) { IOUtils.closeQuietly(columnMetaData.f0); } // ... and finally close the DB instance ... IOUtils.closeQuietly(db); // invalidate the reference db = null; IOUtils.closeQuietly(columnOptions); IOUtils.closeQuietly(dbOptions); IOUtils.closeQuietly(writeOptions); kvStateInformation.clear(); restoredKvStateMetaInfos.clear(); cleanInstanceBasePath(); } } @Nonnull @Override public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T> create( @Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) { return priorityQueueFactory.create(stateName, byteOrderedElementSerializer); } private void cleanInstanceBasePath() { LOG.info("Deleting existing instance base directory {}.", instanceBasePath); try { FileUtils.deleteDirectory(instanceBasePath); } catch (IOException ex) { LOG.warn("Could not delete instance base path for RocksDB: " + instanceBasePath, ex); } } public int getKeyGroupPrefixBytes() { return keyGroupPrefixBytes; } @VisibleForTesting PriorityQueueSetFactory getPriorityQueueFactory() { return priorityQueueFactory; } public WriteOptions getWriteOptions() { return writeOptions; } /** * Triggers an asynchronous snapshot of the keyed state backend from RocksDB. This snapshot can be canceled and * is also stopped when the backend is closed through {@link #dispose()}. For each backend, this method must always * be called by the same thread. * * @param checkpointId The Id of the checkpoint. * @param timestamp The timestamp of the checkpoint. * @param streamFactory The factory that we can use for writing our state to streams. * @param checkpointOptions Options for how to perform this checkpoint. * @return Future to the state handle of the snapshot data. * @throws Exception indicating a problem in the synchronous part of the checkpoint. */ @Override public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot( final long checkpointId, final long timestamp, final CheckpointStreamFactory streamFactory, CheckpointOptions checkpointOptions) throws Exception { // flush everything into db before taking a snapshot writeBatchWrapper.flush(); return snapshotStrategy.performSnapshot(checkpointId, timestamp, streamFactory, checkpointOptions); } @Override public void restore(Collection<KeyedStateHandle> restoreState) throws Exception { LOG.info("Initializing RocksDB keyed state backend."); if (LOG.isDebugEnabled()) { LOG.debug("Restoring snapshot from state handles: {}.", restoreState); } // clear all meta data kvStateInformation.clear(); restoredKvStateMetaInfos.clear(); try { if (restoreState == null || restoreState.isEmpty()) { createDB(); } else { KeyedStateHandle firstStateHandle = restoreState.iterator().next(); if (firstStateHandle instanceof IncrementalKeyedStateHandle || firstStateHandle instanceof IncrementalLocalKeyedStateHandle) { RocksDBIncrementalRestoreOperation<K> restoreOperation = new RocksDBIncrementalRestoreOperation<>(this); restoreOperation.restore(restoreState); } else { RocksDBFullRestoreOperation<K> restoreOperation = new RocksDBFullRestoreOperation<>(this); restoreOperation.doRestore(restoreState); } } } catch (Exception ex) { dispose(); throw ex; } } @Override public void notifyCheckpointComplete(long completedCheckpointId) { if (!enableIncrementalCheckpointing) { return; } synchronized (materializedSstFiles) { if (completedCheckpointId < lastCompletedCheckpointId) { return; } materializedSstFiles.keySet().removeIf(checkpointId -> checkpointId < completedCheckpointId); lastCompletedCheckpointId = completedCheckpointId; } } private void createDB() throws IOException { List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1); this.db = openDB(instanceRocksDBPath.getAbsolutePath(), Collections.emptyList(), columnFamilyHandles); this.writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeOptions); this.defaultColumnFamily = columnFamilyHandles.get(0); } private TtlDB openDB( String path, List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors, List<ColumnFamilyHandle> stateColumnFamilyHandles) throws IOException { List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(1 + stateColumnFamilyDescriptors.size()); // we add the required descriptor for the default CF in FIRST position, see // https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families columnFamilyDescriptors.add(new ColumnFamilyDescriptor(TtlDB.DEFAULT_COLUMN_FAMILY, columnOptions)); columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors); TtlDB dbRef; try { dbRef = TtlDB.open( Preconditions.checkNotNull(dbOptions), Preconditions.checkNotNull(path), columnFamilyDescriptors, stateColumnFamilyHandles, Collections.nCopies(columnFamilyDescriptors.size(), new Integer(ttl)), false); } catch (RocksDBException e) { throw new IOException("Error while opening RocksDB instance.", e); } // requested + default CF Preconditions.checkState(1 + stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(), "Not all requested column family handles have been created"); return dbRef; } /** * Encapsulates the process of restoring a RocksDBKeyedStateBackend from a full snapshot. */ private static final class RocksDBFullRestoreOperation<K> { private final RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend; /** Current key-groups state handle from which we restore key-groups. */ private KeyGroupsStateHandle currentKeyGroupsStateHandle; /** Current input stream we obtained from currentKeyGroupsStateHandle. */ private FSDataInputStream currentStateHandleInStream; /** Current data input view that wraps currentStateHandleInStream. */ private DataInputView currentStateHandleInView; /** Current list of ColumnFamilyHandles for all column families we restore from currentKeyGroupsStateHandle. */ private List<ColumnFamilyHandle> currentStateHandleKVStateColumnFamilies; /** The compression decorator that was used for writing the state, as determined by the meta data. */ private StreamCompressionDecorator keygroupStreamCompressionDecorator; /** * Creates a restore operation object for the given state backend instance. * * @param rocksDBKeyedStateBackend the state backend into which we restore */ public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend) { this.rocksDBKeyedStateBackend = Preconditions.checkNotNull(rocksDBKeyedStateBackend); } /** * Restores all key-groups data that is referenced by the passed state handles. * * @param keyedStateHandles List of all key groups state handles that shall be restored. */ public void doRestore(Collection<KeyedStateHandle> keyedStateHandles) throws IOException, StateMigrationException, RocksDBException { rocksDBKeyedStateBackend.createDB(); for (KeyedStateHandle keyedStateHandle : keyedStateHandles) { if (keyedStateHandle != null) { if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) { throw new IllegalStateException("Unexpected state handle type, " + "expected: " + KeyGroupsStateHandle.class + ", but found: " + keyedStateHandle.getClass()); } this.currentKeyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle; restoreKeyGroupsInStateHandle(); } } } /** * Restore one key groups state handle. */ private void restoreKeyGroupsInStateHandle() throws IOException, StateMigrationException, RocksDBException { try { currentStateHandleInStream = currentKeyGroupsStateHandle.openInputStream(); rocksDBKeyedStateBackend.cancelStreamRegistry.registerCloseable(currentStateHandleInStream); currentStateHandleInView = new DataInputViewStreamWrapper(currentStateHandleInStream); restoreKVStateMetaData(); restoreKVStateData(); } finally { if (rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream)) { IOUtils.closeQuietly(currentStateHandleInStream); } } } /** * Restore the KV-state / ColumnFamily meta data for all key-groups referenced by the current state handle. * * @throws IOException * @throws ClassNotFoundException * @throws RocksDBException */ private void restoreKVStateMetaData() throws IOException, StateMigrationException, RocksDBException { // isSerializerPresenceRequired flag is set to false, since for the RocksDB state backend, // deserialization of state happens lazily during runtime; we depend on the fact // that the new serializer for states could be compatible, and therefore the restore can continue // without old serializers required to be present. KeyedBackendSerializationProxy<K> serializationProxy = new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader, false); serializationProxy.read(currentStateHandleInView); // check for key serializer compatibility; this also reconfigures the // key serializer to be compatible, if it is required and is possible if (CompatibilityUtil.resolveCompatibilityResult( serializationProxy.getKeySerializer(), UnloadableDummyTypeSerializer.class, serializationProxy.getKeySerializerConfigSnapshot(), rocksDBKeyedStateBackend.keySerializer) .isRequiresMigration()) { // TODO replace with state migration; note that key hash codes need to remain the same after migration throw new StateMigrationException("The new key serializer is not compatible to read previous keys. " + "Aborting now since state migration is currently not available"); } this.keygroupStreamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression() ? SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE; List<StateMetaInfoSnapshot> restoredMetaInfos = serializationProxy.getStateMetaInfoSnapshots(); currentStateHandleKVStateColumnFamilies = new ArrayList<>(restoredMetaInfos.size()); for (StateMetaInfoSnapshot restoredMetaInfo : restoredMetaInfos) { Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> registeredColumn = rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName()); if (registeredColumn == null) { byte[] nameBytes = restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET); ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor( nameBytes, rocksDBKeyedStateBackend.columnOptions); RegisteredStateMetaInfoBase stateMetaInfo = RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(restoredMetaInfo); rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo); ColumnFamilyHandle columnFamily = rocksDBKeyedStateBackend.db.createColumnFamilyWithTtl(columnFamilyDescriptor, rocksDBKeyedStateBackend.ttl); registeredColumn = new Tuple2<>(columnFamily, stateMetaInfo); rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(), registeredColumn); } else { // TODO with eager state registration in place, check here for serializer migration strategies } currentStateHandleKVStateColumnFamilies.add(registeredColumn.f0); } } /** * Restore the KV-state / ColumnFamily data for all key-groups referenced by the current state handle. * * @throws IOException * @throws RocksDBException */ private void restoreKVStateData() throws IOException, RocksDBException { //for all key-groups in the current state handle... try (RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(rocksDBKeyedStateBackend.db)) { for (Tuple2<Integer, Long> keyGroupOffset : currentKeyGroupsStateHandle.getGroupRangeOffsets()) { int keyGroup = keyGroupOffset.f0; // Check that restored key groups all belong to the backend Preconditions.checkState(rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup), "The key group must belong to the backend"); long offset = keyGroupOffset.f1; //not empty key-group? if (0L != offset) { currentStateHandleInStream.seek(offset); try (InputStream compressedKgIn = keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream)) { DataInputViewStreamWrapper compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn); //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible int kvStateId = compressedKgInputView.readShort(); ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId); //insert all k/v pairs into DB boolean keyGroupHasMoreKeys = true; while (keyGroupHasMoreKeys) { byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView); byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView); if (RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) { //clear the signal bit in the key to make it ready for insertion again RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key); writeBatchWrapper.put(handle, key, value); //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible kvStateId = RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK & compressedKgInputView.readShort(); if (RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) { keyGroupHasMoreKeys = false; } else { handle = currentStateHandleKVStateColumnFamilies.get(kvStateId); } } else { writeBatchWrapper.put(handle, key, value); } } } } } } } } /** * Encapsulates the process of restoring a RocksDBKeyedStateBackend from an incremental snapshot. */ private static class RocksDBIncrementalRestoreOperation<T> { private final RocksDBKeyedStateBackend<T> stateBackend; private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBackend) { this.stateBackend = stateBackend; } /** * Root method that branches for different implementations of {@link KeyedStateHandle}. */ void restore(Collection<KeyedStateHandle> restoreStateHandles) throws Exception { if (restoreStateHandles.isEmpty()) { return; } final KeyedStateHandle theFirstStateHandle = restoreStateHandles.iterator().next(); boolean isRescaling = (restoreStateHandles.size() > 1 || !Objects.equals(theFirstStateHandle.getKeyGroupRange(), stateBackend.keyGroupRange)); if (!isRescaling) { restoreWithoutRescaling(theFirstStateHandle); } else { restoreWithRescaling(restoreStateHandles); } } /** * Recovery from a single remote incremental state without rescaling. */ void restoreWithoutRescaling(KeyedStateHandle rawStateHandle) throws Exception { IncrementalLocalKeyedStateHandle localKeyedStateHandle; List<StateMetaInfoSnapshot> stateMetaInfoSnapshots; List<ColumnFamilyDescriptor> columnFamilyDescriptors; // Recovery from remote incremental state. Path temporaryRestoreInstancePath = new Path( stateBackend.instanceBasePath.getAbsolutePath(), UUID.randomUUID().toString()); try { if (rawStateHandle instanceof IncrementalKeyedStateHandle) { IncrementalKeyedStateHandle restoreStateHandle = (IncrementalKeyedStateHandle) rawStateHandle; // read state data. transferAllStateDataToDirectory(restoreStateHandle, temporaryRestoreInstancePath); stateMetaInfoSnapshots = readMetaData(restoreStateHandle.getMetaStateHandle()); columnFamilyDescriptors = createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots); // since we transferred all remote state to a local directory, we can use the same code as for // local recovery. localKeyedStateHandle = new IncrementalLocalKeyedStateHandle( restoreStateHandle.getBackendIdentifier(), restoreStateHandle.getCheckpointId(), new DirectoryStateHandle(temporaryRestoreInstancePath), restoreStateHandle.getKeyGroupRange(), restoreStateHandle.getMetaStateHandle(), restoreStateHandle.getSharedState().keySet()); } else if (rawStateHandle instanceof IncrementalLocalKeyedStateHandle) { // Recovery from local incremental state. localKeyedStateHandle = (IncrementalLocalKeyedStateHandle) rawStateHandle; stateMetaInfoSnapshots = readMetaData(localKeyedStateHandle.getMetaDataState()); columnFamilyDescriptors = createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots); } else { throw new IllegalStateException("Unexpected state handle type, " + "expected " + IncrementalKeyedStateHandle.class + " or " + IncrementalLocalKeyedStateHandle.class + ", but found " + rawStateHandle.getClass()); } restoreLocalStateIntoFullInstance( localKeyedStateHandle, columnFamilyDescriptors, stateMetaInfoSnapshots); } finally { FileSystem restoreFileSystem = temporaryRestoreInstancePath.getFileSystem(); if (restoreFileSystem.exists(temporaryRestoreInstancePath)) { restoreFileSystem.delete(temporaryRestoreInstancePath, true); } } } /** * Recovery from multi incremental states with rescaling. For rescaling, this method creates a temporary * RocksDB instance for a key-groups shard. All contents from the temporary instance are copied into the * real restore instance and then the temporary instance is discarded. */ void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandles) throws Exception { initTargetDB(restoreStateHandles, stateBackend.keyGroupRange); byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes]; RocksDBKeySerializationUtils.serializeKeyGroup(stateBackend.getKeyGroupRange().getStartKeyGroup(), startKeyGroupPrefixBytes); byte[] stopKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes]; RocksDBKeySerializationUtils.serializeKeyGroup(stateBackend.getKeyGroupRange().getEndKeyGroup() + 1, stopKeyGroupPrefixBytes); for (KeyedStateHandle rawStateHandle : restoreStateHandles) { if (!(rawStateHandle instanceof IncrementalKeyedStateHandle)) { throw new IllegalStateException("Unexpected state handle type, " + "expected " + IncrementalKeyedStateHandle.class + ", but found " + rawStateHandle.getClass()); } Path temporaryRestoreInstancePath = new Path(stateBackend.instanceBasePath.getAbsolutePath() + UUID.randomUUID().toString()); try (RestoredDBInstance tmpRestoreDBInfo = restoreDBInstanceFromStateHandle( (IncrementalKeyedStateHandle) rawStateHandle, temporaryRestoreInstancePath); RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(stateBackend.db)) { List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors = tmpRestoreDBInfo.columnFamilyDescriptors; List<ColumnFamilyHandle> tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles; // iterating only the requested descriptors automatically skips the default column family handle for (int i = 0; i < tmpColumnFamilyDescriptors.size(); ++i) { ColumnFamilyHandle tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(i); ColumnFamilyDescriptor tmpColumnFamilyDescriptor = tmpColumnFamilyDescriptors.get(i); ColumnFamilyHandle targetColumnFamilyHandle = getOrRegisterColumnFamilyHandle( tmpColumnFamilyDescriptor, null, tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i)); try (RocksIteratorWrapper iterator = getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle)) { iterator.seek(startKeyGroupPrefixBytes); while (iterator.isValid()) { if (RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(iterator.key(), stopKeyGroupPrefixBytes)) { writeBatchWrapper.put(targetColumnFamilyHandle, iterator.key(), iterator.value()); } else { // Since the iterator will visit the record according to the sorted order, // we can just break here. break; } iterator.next(); } } // releases native iterator resources } } finally { FileSystem restoreFileSystem = temporaryRestoreInstancePath.getFileSystem(); if (restoreFileSystem.exists(temporaryRestoreInstancePath)) { restoreFileSystem.delete(temporaryRestoreInstancePath, true); } } } } private class RestoredDBInstance implements AutoCloseable { @Nonnull private final TtlDB db; @Nonnull private final ColumnFamilyHandle defaultColumnFamilyHandle; @Nonnull private final List<ColumnFamilyHandle> columnFamilyHandles; @Nonnull private final List<ColumnFamilyDescriptor> columnFamilyDescriptors; @Nonnull private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots; RestoredDBInstance( @Nonnull TtlDB db, @Nonnull List<ColumnFamilyHandle> columnFamilyHandles, @Nonnull List<ColumnFamilyDescriptor> columnFamilyDescriptors, @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) { this.db = db; this.columnFamilyHandles = columnFamilyHandles; this.defaultColumnFamilyHandle = this.columnFamilyHandles.remove(0); this.columnFamilyDescriptors = columnFamilyDescriptors; this.stateMetaInfoSnapshots = stateMetaInfoSnapshots; } @Override public void close() { IOUtils.closeQuietly(defaultColumnFamilyHandle); for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) { IOUtils.closeQuietly(columnFamilyHandle); } IOUtils.closeQuietly(db); } } private RestoredDBInstance restoreDBInstanceFromStateHandle( IncrementalKeyedStateHandle restoreStateHandle, Path temporaryRestoreInstancePath) throws Exception { transferAllStateDataToDirectory(restoreStateHandle, temporaryRestoreInstancePath); // read meta data List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = readMetaData(restoreStateHandle.getMetaStateHandle()); List<ColumnFamilyDescriptor> columnFamilyDescriptors = createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots); List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(stateMetaInfoSnapshots.size() + 1); TtlDB restoreDb = stateBackend.openDB( temporaryRestoreInstancePath.getPath(), columnFamilyDescriptors, columnFamilyHandles); return new RestoredDBInstance(restoreDb, columnFamilyHandles, columnFamilyDescriptors, stateMetaInfoSnapshots); } private ColumnFamilyHandle getOrRegisterColumnFamilyHandle( ColumnFamilyDescriptor columnFamilyDescriptor, ColumnFamilyHandle columnFamilyHandle, StateMetaInfoSnapshot stateMetaInfoSnapshot) throws RocksDBException { Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> registeredStateMetaInfoEntry = stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName()); if (null == registeredStateMetaInfoEntry) { RegisteredStateMetaInfoBase stateMetaInfo = RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot); registeredStateMetaInfoEntry = new Tuple2<>( columnFamilyHandle != null ? columnFamilyHandle : stateBackend.db.createColumnFamilyWithTtl(columnFamilyDescriptor, stateBackend.ttl), stateMetaInfo); stateBackend.kvStateInformation.put( stateMetaInfoSnapshot.getName(), registeredStateMetaInfoEntry); } return registeredStateMetaInfoEntry.f0; } /** * This method first try to find a initial handle to init the target db, if the initial handle * is not null, we just init the target db with the handle and clip it with the target key-group * range. If the initial handle is null we create a empty db as the target db. */ private void initTargetDB( Collection<KeyedStateHandle> restoreStateHandles, KeyGroupRange targetKeyGroupRange) throws Exception { IncrementalKeyedStateHandle initialHandle = (IncrementalKeyedStateHandle) RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial( restoreStateHandles, targetKeyGroupRange); if (initialHandle != null) { restoreStateHandles.remove(initialHandle); RestoredDBInstance restoreDBInfo = null; Path instancePath = new Path(stateBackend.instanceRocksDBPath.getAbsolutePath()); try { restoreDBInfo = restoreDBInstanceFromStateHandle( initialHandle, instancePath); RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange( restoreDBInfo.db, restoreDBInfo.columnFamilyHandles, targetKeyGroupRange, initialHandle.getKeyGroupRange(), stateBackend.keyGroupPrefixBytes); stateBackend.db = restoreDBInfo.db; stateBackend.defaultColumnFamily = restoreDBInfo.defaultColumnFamilyHandle; stateBackend.writeBatchWrapper = new RocksDBWriteBatchWrapper(stateBackend.db, stateBackend.writeOptions); for (int i = 0; i < restoreDBInfo.stateMetaInfoSnapshots.size(); ++i) { getOrRegisterColumnFamilyHandle( restoreDBInfo.columnFamilyDescriptors.get(i), restoreDBInfo.columnFamilyHandles.get(i), restoreDBInfo.stateMetaInfoSnapshots.get(i)); } } catch (Exception e) { if (restoreDBInfo != null) { restoreDBInfo.close(); } FileSystem restoreFileSystem = instancePath.getFileSystem(); if (restoreFileSystem.exists(instancePath)) { restoreFileSystem.delete(instancePath, true); } throw e; } } else { List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1); stateBackend.db = stateBackend.openDB( stateBackend.instanceRocksDBPath.getAbsolutePath(), Collections.emptyList(), columnFamilyHandles); stateBackend.defaultColumnFamily = columnFamilyHandles.get(0); stateBackend.writeBatchWrapper = new RocksDBWriteBatchWrapper(stateBackend.db, stateBackend.writeOptions); } } /** * This method recreates and registers all {@link ColumnFamilyDescriptor} from Flink's state meta data snapshot. */ private List<ColumnFamilyDescriptor> createAndRegisterColumnFamilyDescriptors( List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) { List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(stateMetaInfoSnapshots.size()); for (StateMetaInfoSnapshot stateMetaInfoSnapshot : stateMetaInfoSnapshots) { ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor( stateMetaInfoSnapshot.getName().getBytes(ConfigConstants.DEFAULT_CHARSET), stateBackend.columnOptions); columnFamilyDescriptors.add(columnFamilyDescriptor); stateBackend.restoredKvStateMetaInfos.put(stateMetaInfoSnapshot.getName(), stateMetaInfoSnapshot); } return columnFamilyDescriptors; } /** * This method implements the core of the restore logic that unifies how local and remote state are recovered. */ private void restoreLocalStateIntoFullInstance( IncrementalLocalKeyedStateHandle restoreStateHandle, List<ColumnFamilyDescriptor> columnFamilyDescriptors, List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) throws Exception { // pick up again the old backend id, so the we can reference existing state stateBackend.backendUID = restoreStateHandle.getBackendIdentifier(); LOG.debug("Restoring keyed backend uid in operator {} from incremental snapshot to {}.", stateBackend.operatorIdentifier, stateBackend.backendUID); // create hard links in the instance directory if (!stateBackend.instanceRocksDBPath.mkdirs()) { throw new IOException("Could not create RocksDB data directory."); } Path restoreSourcePath = restoreStateHandle.getDirectoryStateHandle().getDirectory(); restoreInstanceDirectoryFromPath(restoreSourcePath); List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1 + columnFamilyDescriptors.size()); stateBackend.db = stateBackend.openDB( stateBackend.instanceRocksDBPath.getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles); // extract and store the default column family which is located at the first index stateBackend.defaultColumnFamily = columnFamilyHandles.remove(0); stateBackend.writeBatchWrapper = new RocksDBWriteBatchWrapper(stateBackend.db, stateBackend.writeOptions); for (int i = 0; i < columnFamilyDescriptors.size(); ++i) { StateMetaInfoSnapshot stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i); ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i); RegisteredStateMetaInfoBase stateMetaInfo = RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot); stateBackend.kvStateInformation.put( stateMetaInfoSnapshot.getName(), new Tuple2<>(columnFamilyHandle, stateMetaInfo)); } // use the restore sst files as the base for succeeding checkpoints synchronized (stateBackend.materializedSstFiles) { stateBackend.materializedSstFiles.put( restoreStateHandle.getCheckpointId(), restoreStateHandle.getSharedStateHandleIDs()); } stateBackend.lastCompletedCheckpointId = restoreStateHandle.getCheckpointId(); } /** * This recreates the new working directory of the recovered RocksDB instance and links/copies the contents from * a local state. */ private void restoreInstanceDirectoryFromPath(Path source) throws IOException { FileSystem fileSystem = source.getFileSystem(); final FileStatus[] fileStatuses = fileSystem.listStatus(source); if (fileStatuses == null) { throw new IOException("Cannot list file statues. Directory " + source + " does not exist."); } for (FileStatus fileStatus : fileStatuses) { final Path filePath = fileStatus.getPath(); final String fileName = filePath.getName(); File restoreFile = new File(source.getPath(), fileName); File targetFile = new File(stateBackend.instanceRocksDBPath.getPath(), fileName); if (fileName.endsWith(SST_FILE_SUFFIX)) { // hardlink'ing the immutable sst-files. Files.createLink(targetFile.toPath(), restoreFile.toPath()); } else { // true copy for all other files. Files.copy(restoreFile.toPath(), targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING); } } } /** * Reads Flink's state meta data file from the state handle. */ private List<StateMetaInfoSnapshot> readMetaData( StreamStateHandle metaStateHandle) throws Exception { FSDataInputStream inputStream = null; try { inputStream = metaStateHandle.openInputStream(); stateBackend.cancelStreamRegistry.registerCloseable(inputStream); // isSerializerPresenceRequired flag is set to false, since for the RocksDB state backend, // deserialization of state happens lazily during runtime; we depend on the fact // that the new serializer for states could be compatible, and therefore the restore can continue // without old serializers required to be present. KeyedBackendSerializationProxy<T> serializationProxy = new KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader, false); DataInputView in = new DataInputViewStreamWrapper(inputStream); serializationProxy.read(in); // check for key serializer compatibility; this also reconfigures the // key serializer to be compatible, if it is required and is possible if (CompatibilityUtil.resolveCompatibilityResult( serializationProxy.getKeySerializer(), UnloadableDummyTypeSerializer.class, serializationProxy.getKeySerializerConfigSnapshot(), stateBackend.keySerializer) .isRequiresMigration()) { // TODO replace with state migration; note that key hash codes need to remain the same after migration throw new StateMigrationException("The new key serializer is not compatible to read previous keys. " + "Aborting now since state migration is currently not available"); } return serializationProxy.getStateMetaInfoSnapshots(); } finally { if (stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) { inputStream.close(); } } } private void transferAllStateDataToDirectory( IncrementalKeyedStateHandle restoreStateHandle, Path dest) throws IOException { final Map<StateHandleID, StreamStateHandle> sstFiles = restoreStateHandle.getSharedState(); final Map<StateHandleID, StreamStateHandle> miscFiles = restoreStateHandle.getPrivateState(); transferAllDataFromStateHandles(sstFiles, dest); transferAllDataFromStateHandles(miscFiles, dest); } /** * Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their * {@link StateHandleID}. */ private void transferAllDataFromStateHandles( Map<StateHandleID, StreamStateHandle> stateHandleMap, Path restoreInstancePath) throws IOException { for (Map.Entry<StateHandleID, StreamStateHandle> entry : stateHandleMap.entrySet()) { StateHandleID stateHandleID = entry.getKey(); StreamStateHandle remoteFileHandle = entry.getValue(); copyStateDataHandleData(new Path(restoreInstancePath, stateHandleID.toString()), remoteFileHandle); } } /** * Copies the file from a single state handle to the given path. */ private void copyStateDataHandleData( Path restoreFilePath, StreamStateHandle remoteFileHandle) throws IOException { FileSystem restoreFileSystem = restoreFilePath.getFileSystem(); FSDataInputStream inputStream = null; FSDataOutputStream outputStream = null; try { inputStream = remoteFileHandle.openInputStream(); stateBackend.cancelStreamRegistry.registerCloseable(inputStream); outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE); stateBackend.cancelStreamRegistry.registerCloseable(outputStream); byte[] buffer = new byte[8 * 1024]; while (true) { int numBytes = inputStream.read(buffer); if (numBytes == -1) { break; } outputStream.write(buffer, 0, numBytes); } } finally { if (stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) { inputStream.close(); } if (stateBackend.cancelStreamRegistry.unregisterCloseable(outputStream)) { outputStream.close(); } } } } // ------------------------------------------------------------------------ // State factories // ------------------------------------------------------------------------ /** * Registers a k/v state information, which includes its state id, type, RocksDB column family handle, and serializers. * * <p>When restoring from a snapshot, we don’t restore the individual k/v states, just the global RocksDB database and * the list of k/v state information. When a k/v state is first requested we check here whether we * already have a registered entry for that and return it (after some necessary state compatibility checks) * or create a new one if it does not exist. */ private <N, S> Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, S>> tryRegisterKvStateInformation( StateDescriptor<?, S> stateDesc, TypeSerializer<N> namespaceSerializer, @Nullable StateSnapshotTransformer<S> snapshotTransformer) throws StateMigrationException { Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> stateInfo = kvStateInformation.get(stateDesc.getName()); RegisteredKeyValueStateBackendMetaInfo<N, S> newMetaInfo; if (stateInfo != null) { StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName()); Preconditions.checkState( restoredMetaInfoSnapshot != null, "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + " but its corresponding restored snapshot cannot be found."); newMetaInfo = RegisteredKeyValueStateBackendMetaInfo.resolveKvStateCompatibility( restoredMetaInfoSnapshot, namespaceSerializer, stateDesc, snapshotTransformer); stateInfo.f1 = newMetaInfo; } else { String stateName = stateDesc.getName(); newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>( stateDesc.getType(), stateName, namespaceSerializer, stateDesc.getSerializer(), snapshotTransformer); ColumnFamilyHandle columnFamily = createColumnFamilyWithTtl(stateName); stateInfo = Tuple2.of(columnFamily, newMetaInfo); kvStateInformation.put(stateDesc.getName(), stateInfo); } return Tuple2.of(stateInfo.f0, newMetaInfo); } /** * Creates a column family handle for use with a k/v state. */ private ColumnFamilyHandle createColumnFamilyWithTtl(String stateName) { byte[] nameBytes = stateName.getBytes(ConfigConstants.DEFAULT_CHARSET); Preconditions.checkState(!Arrays.equals(TtlDB.DEFAULT_COLUMN_FAMILY, nameBytes), "The chosen state name 'default' collides with the name of the default column family!"); ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(nameBytes, columnOptions); try { return db.createColumnFamilyWithTtl(columnDescriptor, ttl); } catch (RocksDBException e) { throw new FlinkRuntimeException("Error creating ColumnFamilyHandle.", e); } } @Override @Nonnull public <N, SV, SEV, S extends State, IS extends S> IS createInternalState( @Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<S, SV> stateDesc, @Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception { StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass()); if (stateFactory == null) { String message = String.format("State %s is not supported by %s", stateDesc.getClass(), this.getClass()); throw new FlinkRuntimeException(message); } Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult = tryRegisterKvStateInformation( stateDesc, namespaceSerializer, getStateSnapshotTransformer(stateDesc, snapshotTransformFactory)); return stateFactory.createState(stateDesc, registerResult, RocksDBKeyedStateBackend.this); } @SuppressWarnings("unchecked") private <SV, SEV> StateSnapshotTransformer<SV> getStateSnapshotTransformer( StateDescriptor<?, SV> stateDesc, StateSnapshotTransformFactory<SEV> snapshotTransformFactory) { if (stateDesc instanceof ListStateDescriptor) { Optional<StateSnapshotTransformer<SEV>> original = snapshotTransformFactory.createForDeserializedState(); return original.map(est -> createRocksDBListStateTransformer(stateDesc, est)).orElse(null); } else if (stateDesc instanceof MapStateDescriptor) { Optional<StateSnapshotTransformer<byte[]>> original = snapshotTransformFactory.createForSerializedState(); return (StateSnapshotTransformer<SV>) original .map(RocksDBMapState.StateSnapshotTransformerWrapper::new).orElse(null); } else { Optional<StateSnapshotTransformer<byte[]>> original = snapshotTransformFactory.createForSerializedState(); return (StateSnapshotTransformer<SV>) original.orElse(null); } } @SuppressWarnings("unchecked") private <SV, SEV> StateSnapshotTransformer<SV> createRocksDBListStateTransformer( StateDescriptor<?, SV> stateDesc, StateSnapshotTransformer<SEV> elementTransformer) { return (StateSnapshotTransformer<SV>) new RocksDBListState.StateSnapshotTransformerWrapper<>( elementTransformer, ((ListStateDescriptor<SEV>) stateDesc).getElementSerializer()); } /** * Only visible for testing, DO NOT USE. */ public File getInstanceBasePath() { return instanceBasePath; } @Override public boolean supportsAsynchronousSnapshots() { return true; } @VisibleForTesting @SuppressWarnings("unchecked") @Override public int numKeyValueStateEntries() { int count = 0; for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> column : kvStateInformation.values()) { //TODO maybe filterOrTransform only for k/v states try (RocksIteratorWrapper rocksIterator = getRocksIterator(db, column.f0)) { rocksIterator.seekToFirst(); while (rocksIterator.isValid()) { count++; rocksIterator.next(); } } } return count; } /** * Iterator that merges multiple RocksDB iterators to partition all states into contiguous key-groups. * The resulting iteration sequence is ordered by (key-group, kv-state). */ @VisibleForTesting static class RocksDBMergeIterator implements AutoCloseable { private final PriorityQueue<MergeIterator> heap; private final int keyGroupPrefixByteCount; private boolean newKeyGroup; private boolean newKVState; private boolean valid; MergeIterator currentSubIterator; private static final List<Comparator<MergeIterator>> COMPARATORS; static { int maxBytes = 2; COMPARATORS = new ArrayList<>(maxBytes); for (int i = 0; i < maxBytes; ++i) { final int currentBytes = i + 1; COMPARATORS.add((o1, o2) -> { int arrayCmpRes = compareKeyGroupsForByteArrays( o1.currentKey, o2.currentKey, currentBytes); return arrayCmpRes == 0 ? o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes; }); } } RocksDBMergeIterator( List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators, final int keyGroupPrefixByteCount) { Preconditions.checkNotNull(kvStateIterators); Preconditions.checkArgument(keyGroupPrefixByteCount >= 1); this.keyGroupPrefixByteCount = keyGroupPrefixByteCount; Comparator<MergeIterator> iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount - 1); if (kvStateIterators.size() > 0) { PriorityQueue<MergeIterator> iteratorPriorityQueue = new PriorityQueue<>(kvStateIterators.size(), iteratorComparator); for (Tuple2<RocksIteratorWrapper, Integer> rocksIteratorWithKVStateId : kvStateIterators) { final RocksIteratorWrapper rocksIterator = rocksIteratorWithKVStateId.f0; rocksIterator.seekToFirst(); if (rocksIterator.isValid()) { iteratorPriorityQueue.offer(new MergeIterator(rocksIterator, rocksIteratorWithKVStateId.f1)); } else { IOUtils.closeQuietly(rocksIterator); } } kvStateIterators.clear(); this.heap = iteratorPriorityQueue; this.valid = !heap.isEmpty(); this.currentSubIterator = heap.poll(); } else { // creating a PriorityQueue of size 0 results in an exception. this.heap = null; this.valid = false; } this.newKeyGroup = true; this.newKVState = true; } /** * Advance the iterator. Should only be called if {@link #isValid()} returned true. Valid can only chance after * calls to {@link #next()}. */ public void next() { newKeyGroup = false; newKVState = false; final RocksIteratorWrapper rocksIterator = currentSubIterator.getIterator(); rocksIterator.next(); byte[] oldKey = currentSubIterator.getCurrentKey(); if (rocksIterator.isValid()) { currentSubIterator.currentKey = rocksIterator.key(); if (isDifferentKeyGroup(oldKey, currentSubIterator.getCurrentKey())) { heap.offer(currentSubIterator); currentSubIterator = heap.poll(); newKVState = currentSubIterator.getIterator() != rocksIterator; detectNewKeyGroup(oldKey); } } else { IOUtils.closeQuietly(rocksIterator); if (heap.isEmpty()) { currentSubIterator = null; valid = false; } else { currentSubIterator = heap.poll(); newKVState = true; detectNewKeyGroup(oldKey); } } } private boolean isDifferentKeyGroup(byte[] a, byte[] b) { return 0 != compareKeyGroupsForByteArrays(a, b, keyGroupPrefixByteCount); } private void detectNewKeyGroup(byte[] oldKey) { if (isDifferentKeyGroup(oldKey, currentSubIterator.currentKey)) { newKeyGroup = true; } } /** * @return key-group for the current key */ public int keyGroup() { int result = 0; //big endian decode for (int i = 0; i < keyGroupPrefixByteCount; ++i) { result <<= 8; result |= (currentSubIterator.currentKey[i] & 0xFF); } return result; } public byte[] key() { return currentSubIterator.getCurrentKey(); } public byte[] value() { return currentSubIterator.getIterator().value(); } /** * @return Id of K/V state to which the current key belongs. */ public int kvStateId() { return currentSubIterator.getKvStateId(); } /** * Indicates if current key starts a new k/v-state, i.e. belong to a different k/v-state than it's predecessor. * @return true iff the current key belong to a different k/v-state than it's predecessor. */ public boolean isNewKeyValueState() { return newKVState; } /** * Indicates if current key starts a new key-group, i.e. belong to a different key-group than it's predecessor. * @return true iff the current key belong to a different key-group than it's predecessor. */ public boolean isNewKeyGroup() { return newKeyGroup; } /** * Check if the iterator is still valid. Getters like {@link #key()}, {@link #value()}, etc. as well as * {@link #next()} should only be called if valid returned true. Should be checked after each call to * {@link #next()} before accessing iterator state. * @return True iff this iterator is valid. */ public boolean isValid() { return valid; } private static int compareKeyGroupsForByteArrays(byte[] a, byte[] b, int len) { for (int i = 0; i < len; ++i) { int diff = (a[i] & 0xFF) - (b[i] & 0xFF); if (diff != 0) { return diff; } } return 0; } @Override public void close() { IOUtils.closeQuietly(currentSubIterator); currentSubIterator = null; IOUtils.closeAllQuietly(heap); heap.clear(); } } /** * Wraps a RocksDB iterator to cache it's current key and assigns an id for the key/value state to the iterator. * Used by #MergeIterator. */ @VisibleForTesting protected static final class MergeIterator implements AutoCloseable { /** * @param iterator The #RocksIterator to wrap . * @param kvStateId Id of the K/V state to which this iterator belongs. */ MergeIterator(RocksIteratorWrapper iterator, int kvStateId) { this.iterator = Preconditions.checkNotNull(iterator); this.currentKey = iterator.key(); this.kvStateId = kvStateId; } private final RocksIteratorWrapper iterator; private byte[] currentKey; private final int kvStateId; public byte[] getCurrentKey() { return currentKey; } public void setCurrentKey(byte[] currentKey) { this.currentKey = currentKey; } public RocksIteratorWrapper getIterator() { return iterator; } public int getKvStateId() { return kvStateId; } @Override public void close() { IOUtils.closeQuietly(iterator); } } private static final class TransformingRocksIteratorWrapper extends RocksIteratorWrapper { @Nonnull private final StateSnapshotTransformer<byte[]> stateSnapshotTransformer; private byte[] current; public TransformingRocksIteratorWrapper( @Nonnull RocksIterator iterator, @Nonnull StateSnapshotTransformer<byte[]> stateSnapshotTransformer) { super(iterator); this.stateSnapshotTransformer = stateSnapshotTransformer; } @Override public void seekToFirst() { super.seekToFirst(); filterOrTransform(super::next); } @Override public void seekToLast() { super.seekToLast(); filterOrTransform(super::prev); } @Override public void next() { super.next(); filterOrTransform(super::next); } @Override public void prev() { super.prev(); filterOrTransform(super::prev); } private void filterOrTransform(Runnable advance) { while (isValid() && (current = stateSnapshotTransformer.filterOrTransform(super.value())) == null) { advance.run(); } } @Override public byte[] value() { if (!isValid()) { throw new IllegalStateException("value() method cannot be called if isValid() is false"); } return current; } } /** * Adapter class to bridge between {@link RocksIteratorWrapper} and {@link Iterator} to iterate over the keys. This class * is not thread safe. * * @param <K> the type of the iterated objects, which are keys in RocksDB. */ static class RocksIteratorForKeysWrapper<K> implements Iterator<K>, AutoCloseable { private final RocksIteratorWrapper iterator; private final String state; private final TypeSerializer<K> keySerializer; private final int keyGroupPrefixBytes; private final byte[] namespaceBytes; private final boolean ambiguousKeyPossible; private K nextKey; private K previousKey; RocksIteratorForKeysWrapper( RocksIteratorWrapper iterator, String state, TypeSerializer<K> keySerializer, int keyGroupPrefixBytes, boolean ambiguousKeyPossible, byte[] namespaceBytes) { this.iterator = Preconditions.checkNotNull(iterator); this.state = Preconditions.checkNotNull(state); this.keySerializer = Preconditions.checkNotNull(keySerializer); this.keyGroupPrefixBytes = Preconditions.checkNotNull(keyGroupPrefixBytes); this.namespaceBytes = Preconditions.checkNotNull(namespaceBytes); this.nextKey = null; this.previousKey = null; this.ambiguousKeyPossible = ambiguousKeyPossible; } @Override public boolean hasNext() { try { while (nextKey == null && iterator.isValid()) { byte[] key = iterator.key(); ByteArrayInputStreamWithPos inputStream = new ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - keyGroupPrefixBytes); DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(inputStream); K value = RocksDBKeySerializationUtils.readKey( keySerializer, inputStream, dataInput, ambiguousKeyPossible); int namespaceByteStartPos = inputStream.getPosition(); if (isMatchingNameSpace(key, namespaceByteStartPos) && !Objects.equals(previousKey, value)) { previousKey = value; nextKey = value; } iterator.next(); } } catch (Exception e) { throw new FlinkRuntimeException("Failed to access state [" + state + "]", e); } return nextKey != null; } @Override public K next() { if (!hasNext()) { throw new NoSuchElementException("Failed to access state [" + state + "]"); } K tmpKey = nextKey; nextKey = null; return tmpKey; } private boolean isMatchingNameSpace(@Nonnull byte[] key, int beginPos) { final int namespaceBytesLength = namespaceBytes.length; final int basicLength = namespaceBytesLength + beginPos; if (key.length >= basicLength) { for (int i = 0; i < namespaceBytesLength; ++i) { if (key[beginPos + i] != namespaceBytes[i]) { return false; } } return true; } return false; } @Override public void close() { iterator.close(); } } private class FullSnapshotStrategy implements SnapshotStrategy<SnapshotResult<KeyedStateHandle>> { @Override public RunnableFuture<SnapshotResult<KeyedStateHandle>> performSnapshot( long checkpointId, long timestamp, CheckpointStreamFactory primaryStreamFactory, CheckpointOptions checkpointOptions) throws Exception { long startTime = System.currentTimeMillis(); if (kvStateInformation.isEmpty()) { if (LOG.isDebugEnabled()) { LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", timestamp); } return DoneFuture.of(SnapshotResult.empty()); } final SupplierWithException<CheckpointStreamWithResultProvider, Exception> supplier = localRecoveryConfig.isLocalRecoveryEnabled() && (CheckpointType.SAVEPOINT != checkpointOptions.getCheckpointType()) ? () -> CheckpointStreamWithResultProvider.createDuplicatingStream( checkpointId, CheckpointedStateScope.EXCLUSIVE, primaryStreamFactory, localRecoveryConfig.getLocalStateDirectoryProvider()) : () -> CheckpointStreamWithResultProvider.createSimpleStream( CheckpointedStateScope.EXCLUSIVE, primaryStreamFactory); final CloseableRegistry snapshotCloseableRegistry = new CloseableRegistry(); final RocksDBFullSnapshotOperation<K> snapshotOperation = new RocksDBFullSnapshotOperation<>( RocksDBKeyedStateBackend.this, supplier, snapshotCloseableRegistry); snapshotOperation.takeDBSnapShot(); // implementation of the async IO operation, based on FutureTask AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> ioCallable = new AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>() { @Override protected void acquireResources() throws Exception { cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry); snapshotOperation.openCheckpointStream(); } @Override protected void releaseResources() throws Exception { closeLocalRegistry(); releaseSnapshotOperationResources(); } private void releaseSnapshotOperationResources() { // hold the db lock while operation on the db to guard us against async db disposal snapshotOperation.releaseSnapshotResources(); } @Override protected void stopOperation() throws Exception { closeLocalRegistry(); } private void closeLocalRegistry() { if (cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) { try { snapshotCloseableRegistry.close(); } catch (Exception ex) { LOG.warn("Error closing local registry", ex); } } } @Nonnull @Override public SnapshotResult<KeyedStateHandle> performOperation() throws Exception { long startTime = System.currentTimeMillis(); if (isStopped()) { throw new IOException("RocksDB closed."); } snapshotOperation.writeDBSnapshot(); LOG.info("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.", primaryStreamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime)); return snapshotOperation.getSnapshotResultStateHandle(); } }; LOG.info("Asynchronous RocksDB snapshot ({}, synchronous part) in thread {} took {} ms.", primaryStreamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime)); return AsyncStoppableTaskWithCallback.from(ioCallable); } } private class IncrementalSnapshotStrategy implements SnapshotStrategy<SnapshotResult<KeyedStateHandle>> { private final SnapshotStrategy<SnapshotResult<KeyedStateHandle>> savepointDelegate; public IncrementalSnapshotStrategy() { this.savepointDelegate = new FullSnapshotStrategy(); } @Override public RunnableFuture<SnapshotResult<KeyedStateHandle>> performSnapshot( long checkpointId, long checkpointTimestamp, CheckpointStreamFactory checkpointStreamFactory, CheckpointOptions checkpointOptions) throws Exception { // for savepoints, we delegate to the full snapshot strategy because savepoints are always self-contained. if (CheckpointType.SAVEPOINT == checkpointOptions.getCheckpointType()) { return savepointDelegate.performSnapshot( checkpointId, checkpointTimestamp, checkpointStreamFactory, checkpointOptions); } if (db == null) { throw new IOException("RocksDB closed."); } if (kvStateInformation.isEmpty()) { if (LOG.isDebugEnabled()) { LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", checkpointTimestamp); } return DoneFuture.of(SnapshotResult.empty()); } SnapshotDirectory snapshotDirectory; if (localRecoveryConfig.isLocalRecoveryEnabled()) { // create a "permanent" snapshot directory for local recovery. LocalRecoveryDirectoryProvider directoryProvider = localRecoveryConfig.getLocalStateDirectoryProvider(); File directory = directoryProvider.subtaskSpecificCheckpointDirectory(checkpointId); if (directory.exists()) { FileUtils.deleteDirectory(directory); } if (!directory.mkdirs()) { throw new IOException("Local state base directory for checkpoint " + checkpointId + " already exists: " + directory); } // introduces an extra directory because RocksDB wants a non-existing directory for native checkpoints. File rdbSnapshotDir = new File(directory, "rocks_db"); Path path = new Path(rdbSnapshotDir.toURI()); // create a "permanent" snapshot directory because local recovery is active. snapshotDirectory = SnapshotDirectory.permanent(path); } else { // create a "temporary" snapshot directory because local recovery is inactive. Path path = new Path(instanceBasePath.getAbsolutePath(), "chk-" + checkpointId); snapshotDirectory = SnapshotDirectory.temporary(path); } final RocksDBIncrementalSnapshotOperation<K> snapshotOperation = new RocksDBIncrementalSnapshotOperation<>( RocksDBKeyedStateBackend.this, checkpointStreamFactory, snapshotDirectory, checkpointId); try { snapshotOperation.takeSnapshot(); } catch (Exception e) { snapshotOperation.stop(); snapshotOperation.releaseResources(true); throw e; } return new FutureTask<SnapshotResult<KeyedStateHandle>>( snapshotOperation::runSnapshot ) { @Override public boolean cancel(boolean mayInterruptIfRunning) { snapshotOperation.stop(); return super.cancel(mayInterruptIfRunning); } @Override protected void done() { snapshotOperation.releaseResources(isCancelled()); } }; } } /** * Encapsulates the process to perform a full snapshot of a RocksDBKeyedStateBackend. */ @VisibleForTesting static class RocksDBFullSnapshotOperation<K> extends AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> { static final int FIRST_BIT_IN_BYTE_MASK = 0x80; static final int END_OF_KEY_GROUP_MARK = 0xFFFF; private final RocksDBKeyedStateBackend<K> stateBackend; private final KeyGroupRangeOffsets keyGroupRangeOffsets; private final SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier; private final CloseableRegistry snapshotCloseableRegistry; private final ResourceGuard.Lease dbLease; private Snapshot snapshot; private ReadOptions readOptions; /** * The state meta data. */ private List<StateMetaInfoSnapshot> stateMetaInfoSnapshots; /** * The copied column handle. */ private List<Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> copiedMeta; private List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators; private CheckpointStreamWithResultProvider checkpointStreamWithResultProvider; private DataOutputView outputView; RocksDBFullSnapshotOperation( RocksDBKeyedStateBackend<K> stateBackend, SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier, CloseableRegistry registry) throws IOException { this.stateBackend = stateBackend; this.checkpointStreamSupplier = checkpointStreamSupplier; this.keyGroupRangeOffsets = new KeyGroupRangeOffsets(stateBackend.keyGroupRange); this.snapshotCloseableRegistry = registry; this.dbLease = this.stateBackend.rocksDBResourceGuard.acquireResource(); } /** * 1) Create a snapshot object from RocksDB. * */ public void takeDBSnapShot() { Preconditions.checkArgument(snapshot == null, "Only one ongoing snapshot allowed!"); this.stateMetaInfoSnapshots = new ArrayList<>(stateBackend.kvStateInformation.size()); this.copiedMeta = new ArrayList<>(stateBackend.kvStateInformation.size()); for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> tuple2 : stateBackend.kvStateInformation.values()) { // snapshot meta info this.stateMetaInfoSnapshots.add(tuple2.f1.snapshot()); this.copiedMeta.add(tuple2); } this.snapshot = stateBackend.db.getSnapshot(); } /** * 2) Open CheckpointStateOutputStream through the checkpointStreamFactory into which we will write. * * @throws Exception */ public void openCheckpointStream() throws Exception { Preconditions.checkArgument(checkpointStreamWithResultProvider == null, "Output stream for snapshot is already set."); checkpointStreamWithResultProvider = checkpointStreamSupplier.get(); snapshotCloseableRegistry.registerCloseable(checkpointStreamWithResultProvider); outputView = new DataOutputViewStreamWrapper( checkpointStreamWithResultProvider.getCheckpointOutputStream()); } /** * 3) Write the actual data from RocksDB from the time we took the snapshot object in (1). * * @throws IOException */ public void writeDBSnapshot() throws IOException, InterruptedException, RocksDBException { if (null == snapshot) { throw new IOException("No snapshot available. Might be released due to cancellation."); } Preconditions.checkNotNull(checkpointStreamWithResultProvider, "No output stream to write snapshot."); writeKVStateMetaData(); writeKVStateData(); } /** * 4) Returns a snapshot result for the completed snapshot. * * @return snapshot result for the completed snapshot. */ @Nonnull public SnapshotResult<KeyedStateHandle> getSnapshotResultStateHandle() throws IOException { if (snapshotCloseableRegistry.unregisterCloseable(checkpointStreamWithResultProvider)) { SnapshotResult<StreamStateHandle> res = checkpointStreamWithResultProvider.closeAndFinalizeCheckpointStreamResult(); checkpointStreamWithResultProvider = null; return CheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult(res, keyGroupRangeOffsets); } return SnapshotResult.empty(); } /** * 5) Release the snapshot object for RocksDB and clean up. */ public void releaseSnapshotResources() { checkpointStreamWithResultProvider = null; if (null != kvStateIterators) { for (Tuple2<RocksIteratorWrapper, Integer> kvStateIterator : kvStateIterators) { IOUtils.closeQuietly(kvStateIterator.f0); } kvStateIterators = null; } if (null != snapshot) { if (null != stateBackend.db) { stateBackend.db.releaseSnapshot(snapshot); } IOUtils.closeQuietly(snapshot); snapshot = null; } if (null != readOptions) { IOUtils.closeQuietly(readOptions); readOptions = null; } this.dbLease.close(); } private void writeKVStateMetaData() throws IOException { this.kvStateIterators = new ArrayList<>(copiedMeta.size()); int kvStateId = 0; //retrieve iterator for this k/v states readOptions = new ReadOptions(); readOptions.setSnapshot(snapshot); for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> tuple2 : copiedMeta) { RocksIteratorWrapper rocksIteratorWrapper = getRocksIterator(stateBackend.db, tuple2.f0, tuple2.f1, readOptions); kvStateIterators.add(new Tuple2<>(rocksIteratorWrapper, kvStateId)); ++kvStateId; } KeyedBackendSerializationProxy<K> serializationProxy = new KeyedBackendSerializationProxy<>( // TODO: this code assumes that writing a serializer is threadsafe, we should support to // get a serialized form already at state registration time in the future stateBackend.getKeySerializer(), stateMetaInfoSnapshots, !Objects.equals( UncompressedStreamCompressionDecorator.INSTANCE, stateBackend.keyGroupCompressionDecorator)); serializationProxy.write(outputView); } private void writeKVStateData() throws IOException, InterruptedException { byte[] previousKey = null; byte[] previousValue = null; DataOutputView kgOutView = null; OutputStream kgOutStream = null; CheckpointStreamFactory.CheckpointStateOutputStream checkpointOutputStream = checkpointStreamWithResultProvider.getCheckpointOutputStream(); try { // Here we transfer ownership of RocksIterators to the RocksDBMergeIterator try (RocksDBMergeIterator mergeIterator = new RocksDBMergeIterator( kvStateIterators, stateBackend.keyGroupPrefixBytes)) { // handover complete, null out to prevent double close kvStateIterators = null; //preamble: setup with first key-group as our lookahead if (mergeIterator.isValid()) { //begin first key-group by recording the offset keyGroupRangeOffsets.setKeyGroupOffset( mergeIterator.keyGroup(), checkpointOutputStream.getPos()); //write the k/v-state id as metadata kgOutStream = stateBackend.keyGroupCompressionDecorator. decorateWithCompression(checkpointOutputStream); kgOutView = new DataOutputViewStreamWrapper(kgOutStream); //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible kgOutView.writeShort(mergeIterator.kvStateId()); previousKey = mergeIterator.key(); previousValue = mergeIterator.value(); mergeIterator.next(); } //main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking key-group offsets. while (mergeIterator.isValid()) { assert (!hasMetaDataFollowsFlag(previousKey)); //set signal in first key byte that meta data will follow in the stream after this k/v pair if (mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) { //be cooperative and check for interruption from time to time in the hot loop checkInterrupted(); setMetaDataFollowsFlagInKey(previousKey); } writeKeyValuePair(previousKey, previousValue, kgOutView); //write meta data if we have to if (mergeIterator.isNewKeyGroup()) { //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible kgOutView.writeShort(END_OF_KEY_GROUP_MARK); // this will just close the outer stream kgOutStream.close(); //begin new key-group keyGroupRangeOffsets.setKeyGroupOffset( mergeIterator.keyGroup(), checkpointOutputStream.getPos()); //write the kev-state //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible kgOutStream = stateBackend.keyGroupCompressionDecorator. decorateWithCompression(checkpointOutputStream); kgOutView = new DataOutputViewStreamWrapper(kgOutStream); kgOutView.writeShort(mergeIterator.kvStateId()); } else if (mergeIterator.isNewKeyValueState()) { //write the k/v-state //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible kgOutView.writeShort(mergeIterator.kvStateId()); } //request next k/v pair previousKey = mergeIterator.key(); previousValue = mergeIterator.value(); mergeIterator.next(); } } //epilogue: write last key-group if (previousKey != null) { assert (!hasMetaDataFollowsFlag(previousKey)); setMetaDataFollowsFlagInKey(previousKey); writeKeyValuePair(previousKey, previousValue, kgOutView); //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible kgOutView.writeShort(END_OF_KEY_GROUP_MARK); // this will just close the outer stream kgOutStream.close(); kgOutStream = null; } } finally { // this will just close the outer stream IOUtils.closeQuietly(kgOutStream); } } private void writeKeyValuePair(byte[] key, byte[] value, DataOutputView out) throws IOException { BytePrimitiveArraySerializer.INSTANCE.serialize(key, out); BytePrimitiveArraySerializer.INSTANCE.serialize(value, out); } static void setMetaDataFollowsFlagInKey(byte[] key) { key[0] |= FIRST_BIT_IN_BYTE_MASK; } static void clearMetaDataFollowsFlag(byte[] key) { key[0] &= (~RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK); } static boolean hasMetaDataFollowsFlag(byte[] key) { return 0 != (key[0] & RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK); } private static void checkInterrupted() throws InterruptedException { if (Thread.currentThread().isInterrupted()) { throw new InterruptedException("RocksDB snapshot interrupted."); } } @Override protected void acquireResources() throws Exception { stateBackend.cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry); openCheckpointStream(); } @Override protected void releaseResources() { closeLocalRegistry(); releaseSnapshotOperationResources(); } private void releaseSnapshotOperationResources() { // hold the db lock while operation on the db to guard us against async db disposal releaseSnapshotResources(); } @Override protected void stopOperation() { closeLocalRegistry(); } private void closeLocalRegistry() { if (stateBackend.cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) { try { snapshotCloseableRegistry.close(); } catch (Exception ex) { LOG.warn("Error closing local registry", ex); } } } @Nonnull @Override public SnapshotResult<KeyedStateHandle> performOperation() throws Exception { long startTime = System.currentTimeMillis(); if (isStopped()) { throw new IOException("RocksDB closed."); } writeDBSnapshot(); LOG.info("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.", checkpointStreamSupplier, Thread.currentThread(), (System.currentTimeMillis() - startTime)); return getSnapshotResultStateHandle(); } } /** * Encapsulates the process to perform an incremental snapshot of a RocksDBKeyedStateBackend. */ private static final class RocksDBIncrementalSnapshotOperation<K> { /** The backend which we snapshot. */ private final RocksDBKeyedStateBackend<K> stateBackend; /** Stream factory that creates the outpus streams to DFS. */ private final CheckpointStreamFactory checkpointStreamFactory; /** Id for the current checkpoint. */ private final long checkpointId; /** All sst files that were part of the last previously completed checkpoint. */ private Set<StateHandleID> baseSstFiles; /** The state meta data. */ private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = new ArrayList<>(); /** Local directory for the RocksDB native backup. */ private SnapshotDirectory localBackupDirectory; // Registry for all opened i/o streams private final CloseableRegistry closeableRegistry = new CloseableRegistry(); // new sst files since the last completed checkpoint private final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>(); // handles to the misc files in the current snapshot private final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>(); // This lease protects from concurrent disposal of the native rocksdb instance. private final ResourceGuard.Lease dbLease; private SnapshotResult<StreamStateHandle> metaStateHandle = null; private RocksDBIncrementalSnapshotOperation( RocksDBKeyedStateBackend<K> stateBackend, CheckpointStreamFactory checkpointStreamFactory, SnapshotDirectory localBackupDirectory, long checkpointId) throws IOException { this.stateBackend = stateBackend; this.checkpointStreamFactory = checkpointStreamFactory; this.checkpointId = checkpointId; this.dbLease = this.stateBackend.rocksDBResourceGuard.acquireResource(); this.localBackupDirectory = localBackupDirectory; } private StreamStateHandle materializeStateData(Path filePath) throws Exception { FSDataInputStream inputStream = null; CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null; try { final byte[] buffer = new byte[8 * 1024]; FileSystem backupFileSystem = localBackupDirectory.getFileSystem(); inputStream = backupFileSystem.open(filePath); closeableRegistry.registerCloseable(inputStream); outputStream = checkpointStreamFactory .createCheckpointStateOutputStream(CheckpointedStateScope.SHARED); closeableRegistry.registerCloseable(outputStream); while (true) { int numBytes = inputStream.read(buffer); if (numBytes == -1) { break; } outputStream.write(buffer, 0, numBytes); } StreamStateHandle result = null; if (closeableRegistry.unregisterCloseable(outputStream)) { result = outputStream.closeAndGetHandle(); outputStream = null; } return result; } finally { if (closeableRegistry.unregisterCloseable(inputStream)) { inputStream.close(); } if (closeableRegistry.unregisterCloseable(outputStream)) { outputStream.close(); } } } @Nonnull private SnapshotResult<StreamStateHandle> materializeMetaData() throws Exception { LocalRecoveryConfig localRecoveryConfig = stateBackend.localRecoveryConfig; CheckpointStreamWithResultProvider streamWithResultProvider = localRecoveryConfig.isLocalRecoveryEnabled() ? CheckpointStreamWithResultProvider.createDuplicatingStream( checkpointId, CheckpointedStateScope.EXCLUSIVE, checkpointStreamFactory, localRecoveryConfig.getLocalStateDirectoryProvider()) : CheckpointStreamWithResultProvider.createSimpleStream( CheckpointedStateScope.EXCLUSIVE, checkpointStreamFactory); try { closeableRegistry.registerCloseable(streamWithResultProvider); //no need for compression scheme support because sst-files are already compressed KeyedBackendSerializationProxy<K> serializationProxy = new KeyedBackendSerializationProxy<>( stateBackend.keySerializer, stateMetaInfoSnapshots, false); DataOutputView out = new DataOutputViewStreamWrapper(streamWithResultProvider.getCheckpointOutputStream()); serializationProxy.write(out); if (closeableRegistry.unregisterCloseable(streamWithResultProvider)) { SnapshotResult<StreamStateHandle> result = streamWithResultProvider.closeAndFinalizeCheckpointStreamResult(); streamWithResultProvider = null; return result; } else { throw new IOException("Stream already closed and cannot return a handle."); } } finally { if (streamWithResultProvider != null) { if (closeableRegistry.unregisterCloseable(streamWithResultProvider)) { IOUtils.closeQuietly(streamWithResultProvider); } } } } void takeSnapshot() throws Exception { final long lastCompletedCheckpoint; // use the last completed checkpoint as the comparison base. synchronized (stateBackend.materializedSstFiles) { lastCompletedCheckpoint = stateBackend.lastCompletedCheckpointId; baseSstFiles = stateBackend.materializedSstFiles.get(lastCompletedCheckpoint); } LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} " + "assuming the following (shared) files as base: {}.", checkpointId, lastCompletedCheckpoint, baseSstFiles); // save meta data for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> stateMetaInfoEntry : stateBackend.kvStateInformation.entrySet()) { stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot()); } LOG.trace("Local RocksDB checkpoint goes to backup path {}.", localBackupDirectory); if (localBackupDirectory.exists()) { throw new IllegalStateException("Unexpected existence of the backup directory."); } // create hard links of living files in the snapshot path try (Checkpoint checkpoint = Checkpoint.create(stateBackend.db)) { checkpoint.createCheckpoint(localBackupDirectory.getDirectory().getPath()); } } @Nonnull SnapshotResult<KeyedStateHandle> runSnapshot() throws Exception { stateBackend.cancelStreamRegistry.registerCloseable(closeableRegistry); // write meta data metaStateHandle = materializeMetaData(); // sanity checks - they should never fail Preconditions.checkNotNull(metaStateHandle, "Metadata was not properly created."); Preconditions.checkNotNull(metaStateHandle.getJobManagerOwnedSnapshot(), "Metadata for job manager was not properly created."); // write state data Preconditions.checkState(localBackupDirectory.exists()); FileStatus[] fileStatuses = localBackupDirectory.listStatus(); if (fileStatuses != null) { for (FileStatus fileStatus : fileStatuses) { final Path filePath = fileStatus.getPath(); final String fileName = filePath.getName(); final StateHandleID stateHandleID = new StateHandleID(fileName); if (fileName.endsWith(SST_FILE_SUFFIX)) { final boolean existsAlready = baseSstFiles != null && baseSstFiles.contains(stateHandleID); if (existsAlready) { // we introduce a placeholder state handle, that is replaced with the // original from the shared state registry (created from a previous checkpoint) sstFiles.put( stateHandleID, new PlaceholderStreamStateHandle()); } else { sstFiles.put(stateHandleID, materializeStateData(filePath)); } } else { StreamStateHandle fileHandle = materializeStateData(filePath); miscFiles.put(stateHandleID, fileHandle); } } } synchronized (stateBackend.materializedSstFiles) { stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet()); } IncrementalKeyedStateHandle jmIncrementalKeyedStateHandle = new IncrementalKeyedStateHandle( stateBackend.backendUID, stateBackend.keyGroupRange, checkpointId, sstFiles, miscFiles, metaStateHandle.getJobManagerOwnedSnapshot()); StreamStateHandle taskLocalSnapshotMetaDataStateHandle = metaStateHandle.getTaskLocalSnapshot(); DirectoryStateHandle directoryStateHandle = null; try { directoryStateHandle = localBackupDirectory.completeSnapshotAndGetHandle(); } catch (IOException ex) { Exception collector = ex; try { taskLocalSnapshotMetaDataStateHandle.discardState(); } catch (Exception discardEx) { collector = ExceptionUtils.firstOrSuppressed(discardEx, collector); } LOG.warn("Problem with local state snapshot.", collector); } if (directoryStateHandle != null && taskLocalSnapshotMetaDataStateHandle != null) { IncrementalLocalKeyedStateHandle localDirKeyedStateHandle = new IncrementalLocalKeyedStateHandle( stateBackend.backendUID, checkpointId, directoryStateHandle, stateBackend.keyGroupRange, taskLocalSnapshotMetaDataStateHandle, sstFiles.keySet()); return SnapshotResult.withLocalState(jmIncrementalKeyedStateHandle, localDirKeyedStateHandle); } else { return SnapshotResult.of(jmIncrementalKeyedStateHandle); } } void stop() { if (stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) { try { closeableRegistry.close(); } catch (IOException e) { LOG.warn("Could not properly close io streams.", e); } } } void releaseResources(boolean canceled) { dbLease.close(); if (stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) { try { closeableRegistry.close(); } catch (IOException e) { LOG.warn("Exception on closing registry.", e); } } try { if (localBackupDirectory.exists()) { LOG.trace("Running cleanup for local RocksDB backup directory {}.", localBackupDirectory); boolean cleanupOk = localBackupDirectory.cleanup(); if (!cleanupOk) { LOG.debug("Could not properly cleanup local RocksDB backup directory."); } } } catch (IOException e) { LOG.warn("Could not properly cleanup local RocksDB backup directory.", e); } if (canceled) { Collection<StateObject> statesToDiscard = new ArrayList<>(1 + miscFiles.size() + sstFiles.size()); statesToDiscard.add(metaStateHandle); statesToDiscard.addAll(miscFiles.values()); statesToDiscard.addAll(sstFiles.values()); try { StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard); } catch (Exception e) { LOG.warn("Could not properly discard states.", e); } if (localBackupDirectory.isSnapshotCompleted()) { try { DirectoryStateHandle directoryStateHandle = localBackupDirectory.completeSnapshotAndGetHandle(); if (directoryStateHandle != null) { directoryStateHandle.discardState(); } } catch (Exception e) { LOG.warn("Could not properly discard local state.", e); } } } } } public static RocksIteratorWrapper getRocksIterator(TtlDB db) { return new RocksIteratorWrapper(db.newIterator()); } public static RocksIteratorWrapper getRocksIterator( TtlDB db, ColumnFamilyHandle columnFamilyHandle) { return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle)); } @SuppressWarnings("unchecked") private static RocksIteratorWrapper getRocksIterator( TtlDB db, ColumnFamilyHandle columnFamilyHandle, RegisteredStateMetaInfoBase metaInfo, ReadOptions readOptions) { StateSnapshotTransformer<byte[]> stateSnapshotTransformer = null; if (metaInfo instanceof RegisteredKeyValueStateBackendMetaInfo) { stateSnapshotTransformer = (StateSnapshotTransformer<byte[]>) ((RegisteredKeyValueStateBackendMetaInfo<?, ?>) metaInfo).getSnapshotTransformer(); } RocksIterator rocksIterator = db.newIterator(columnFamilyHandle, readOptions); return stateSnapshotTransformer == null ? new RocksIteratorWrapper(rocksIterator) : new TransformingRocksIteratorWrapper(rocksIterator, stateSnapshotTransformer); } /** * Encapsulates the logic and resources in connection with creating priority queue state structures. */ class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory { /** Default cache size per key-group. */ private static final int DEFAULT_CACHES_SIZE = 128; //TODO make this configurable /** A shared buffer to serialize elements for the priority queue. */ @Nonnull private final ByteArrayDataOutputView sharedElementOutView; /** A shared buffer to de-serialize elements for the priority queue. */ @Nonnull private final ByteArrayDataInputView sharedElementInView; RocksDBPriorityQueueSetFactory() { this.sharedElementOutView = new ByteArrayDataOutputView(); this.sharedElementInView = new ByteArrayDataInputView(); } @Nonnull @Override public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) { final Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> metaInfoTuple = tryRegisterPriorityQueueMetaInfo(stateName, byteOrderedElementSerializer); final ColumnFamilyHandle columnFamilyHandle = metaInfoTuple.f0; return new KeyGroupPartitionedPriorityQueue<>( KeyExtractorFunction.forKeyedObjects(), PriorityComparator.forPriorityComparableObjects(), new KeyGroupPartitionedPriorityQueue.PartitionQueueSetFactory<T, RocksDBCachingPriorityQueueSet<T>>() { @Nonnull @Override public RocksDBCachingPriorityQueueSet<T> create( int keyGroupId, int numKeyGroups, @Nonnull KeyExtractorFunction<T> keyExtractor, @Nonnull PriorityComparator<T> elementPriorityComparator) { TreeOrderedSetCache orderedSetCache = new TreeOrderedSetCache(DEFAULT_CACHES_SIZE); return new RocksDBCachingPriorityQueueSet<>( keyGroupId, keyGroupPrefixBytes, db, columnFamilyHandle, byteOrderedElementSerializer, sharedElementOutView, sharedElementInView, writeBatchWrapper, orderedSetCache ); } }, keyGroupRange, numberOfKeyGroups); } } @Nonnull private <T> Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> tryRegisterPriorityQueueMetaInfo( @Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) { Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> metaInfoTuple = kvStateInformation.get(stateName); if (metaInfoTuple == null) { final ColumnFamilyHandle columnFamilyHandle = createColumnFamilyWithTtl(stateName); RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo = new RegisteredPriorityQueueStateBackendMetaInfo<>(stateName, byteOrderedElementSerializer); metaInfoTuple = new Tuple2<>(columnFamilyHandle, metaInfo); kvStateInformation.put(stateName, metaInfoTuple); } else { // TODO we implement the simple way of supporting the current functionality, mimicking keyed state // because this should be reworked in FLINK-9376 and then we should have a common algorithm over // StateMetaInfoSnapshot that avoids this code duplication. StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateName); Preconditions.checkState( restoredMetaInfoSnapshot != null, "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + " but its corresponding restored snapshot cannot be found."); StateMetaInfoSnapshot.CommonSerializerKeys serializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER; TypeSerializer<?> metaInfoTypeSerializer = restoredMetaInfoSnapshot.getTypeSerializer(serializerKey); if (metaInfoTypeSerializer != byteOrderedElementSerializer) { CompatibilityResult<T> compatibilityResult = CompatibilityUtil.resolveCompatibilityResult( metaInfoTypeSerializer, null, restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(serializerKey), byteOrderedElementSerializer); if (compatibilityResult.isRequiresMigration()) { throw new FlinkRuntimeException(StateMigrationException.notSupported()); } // update meta info with new serializer metaInfoTuple.f1 = new RegisteredPriorityQueueStateBackendMetaInfo<>(stateName, byteOrderedElementSerializer); } } return metaInfoTuple; } @Override public boolean requiresLegacySynchronousTimerSnapshots() { return priorityQueueFactory instanceof HeapPriorityQueueSetFactory; } }