/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricMonitor;
import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricOptions;
import org.apache.flink.contrib.streaming.state.RocksDBOperationUtils;
import org.apache.flink.contrib.streaming.state.RocksDBOptions;
import org.apache.flink.contrib.streaming.state.RocksDBPriorityQueueConfig;
import org.apache.flink.contrib.streaming.state.RocksDBPriorityQueueSetFactory;
import org.apache.flink.contrib.streaming.state.RocksDBResourceContainer;
import org.apache.flink.contrib.streaming.state.RocksDBStateDataTransferHelper;
import org.apache.flink.contrib.streaming.state.RocksDBStateUploader;
import org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper;
import org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation;
import org.apache.flink.contrib.streaming.state.restore.RocksDBHeapTimersFullRestoreOperation;
import org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation;
import org.apache.flink.contrib.streaming.state.restore.RocksDBNoneRestoreOperation;
import org.apache.flink.contrib.streaming.state.restore.RocksDBRestoreOperation;
import org.apache.flink.contrib.streaming.state.restore.RocksDBRestoreResult;
import org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase;
import org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy;
import org.apache.flink.contrib.streaming.state.snapshot.RocksNativeFullSnapshotStrategy;
import org.apache.flink.contrib.streaming.state.sstmerge.RocksDBManualCompactionConfig;
import org.apache.flink.contrib.streaming.state.sstmerge.RocksDBManualCompactionManager;
import org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackendBuilder;
import org.apache.flink.runtime.state.BackendBuildingException;
import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.InternalKeyContextImpl;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.RocksDB;

public class RocksDBKeyedStateBackendBuilder<K>
extends AbstractKeyedStateBackendBuilder<K> {
    static final String DB_INSTANCE_DIR_STRING = "db";
    private final String operatorIdentifier;
    private final RocksDBPriorityQueueConfig priorityQueueConfig;
    private final LocalRecoveryConfig localRecoveryConfig;
    private final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory;
    private final RocksDBResourceContainer optionsContainer;
    private final File instanceBasePath;
    private final File instanceRocksDBPath;
    private final MetricGroup metricGroup;
    private final StateBackend.CustomInitializationMetrics customInitializationMetrics;
    private boolean enableIncrementalCheckpointing;
    private RocksDBNativeMetricOptions nativeMetricOptions;
    private int numberOfTransferingThreads;
    private long writeBatchSize = ((MemorySize)RocksDBConfigurableOptions.WRITE_BATCH_SIZE.defaultValue()).getBytes();
    private RocksDB injectedTestDB;
    private boolean incrementalRestoreAsyncCompactAfterRescale = (Boolean)RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE.defaultValue();
    private boolean rescalingUseDeleteFilesInRange = (Boolean)RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING.defaultValue();
    private double overlapFractionThreshold = (Double)RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD.defaultValue();
    private boolean useIngestDbRestoreMode = (Boolean)RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE.defaultValue();
    private ColumnFamilyHandle injectedDefaultColumnFamilyHandle;
    private RocksDBStateUploader injectRocksDBStateUploader;
    private RocksDBManualCompactionConfig manualCompactionConfig = RocksDBManualCompactionConfig.getDefault();
    private ExecutorService ioExecutor;

    public RocksDBKeyedStateBackendBuilder(String operatorIdentifier, ClassLoader userCodeClassLoader, File instanceBasePath, RocksDBResourceContainer optionsContainer, Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, ExecutionConfig executionConfig, LocalRecoveryConfig localRecoveryConfig, RocksDBPriorityQueueConfig priorityQueueConfig, TtlTimeProvider ttlTimeProvider, LatencyTrackingStateConfig latencyTrackingStateConfig, MetricGroup metricGroup, StateBackend.CustomInitializationMetrics customInitializationMetrics, @Nonnull Collection<KeyedStateHandle> stateHandles, StreamCompressionDecorator keyGroupCompressionDecorator, CloseableRegistry cancelStreamRegistry) {
        super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig, ttlTimeProvider, latencyTrackingStateConfig, stateHandles, keyGroupCompressionDecorator, cancelStreamRegistry);
        this.operatorIdentifier = operatorIdentifier;
        this.priorityQueueConfig = priorityQueueConfig;
        this.localRecoveryConfig = localRecoveryConfig;
        this.columnFamilyOptionsFactory = (Function)Preconditions.checkNotNull(columnFamilyOptionsFactory);
        this.optionsContainer = optionsContainer;
        this.instanceBasePath = instanceBasePath;
        this.instanceRocksDBPath = RocksDBKeyedStateBackendBuilder.getInstanceRocksDBPath(instanceBasePath);
        this.metricGroup = metricGroup;
        this.customInitializationMetrics = customInitializationMetrics;
        this.enableIncrementalCheckpointing = false;
        this.nativeMetricOptions = new RocksDBNativeMetricOptions();
        this.numberOfTransferingThreads = (Integer)RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue();
    }

    @VisibleForTesting
    RocksDBKeyedStateBackendBuilder(String operatorIdentifier, ClassLoader userCodeClassLoader, File instanceBasePath, RocksDBResourceContainer optionsContainer, Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, ExecutionConfig executionConfig, LocalRecoveryConfig localRecoveryConfig, RocksDBPriorityQueueConfig rocksDBPriorityQueueConfig, TtlTimeProvider ttlTimeProvider, LatencyTrackingStateConfig latencyTrackingStateConfig, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> stateHandles, StreamCompressionDecorator keyGroupCompressionDecorator, RocksDB injectedTestDB, ColumnFamilyHandle injectedDefaultColumnFamilyHandle, CloseableRegistry cancelStreamRegistry) {
        this(operatorIdentifier, userCodeClassLoader, instanceBasePath, optionsContainer, columnFamilyOptionsFactory, kvStateRegistry, keySerializer, numberOfKeyGroups, keyGroupRange, executionConfig, localRecoveryConfig, rocksDBPriorityQueueConfig, ttlTimeProvider, latencyTrackingStateConfig, metricGroup, (key, value) -> {}, stateHandles, keyGroupCompressionDecorator, cancelStreamRegistry);
        this.injectedTestDB = injectedTestDB;
        this.injectedDefaultColumnFamilyHandle = injectedDefaultColumnFamilyHandle;
    }

    RocksDBKeyedStateBackendBuilder<K> setEnableIncrementalCheckpointing(boolean enableIncrementalCheckpointing) {
        this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
        return this;
    }

    RocksDBKeyedStateBackendBuilder<K> setNativeMetricOptions(RocksDBNativeMetricOptions nativeMetricOptions) {
        this.nativeMetricOptions = nativeMetricOptions;
        return this;
    }

    RocksDBKeyedStateBackendBuilder<K> setNumberOfTransferingThreads(int numberOfTransferingThreads) {
        Preconditions.checkState((this.injectRocksDBStateUploader == null ? 1 : 0) != 0, (Object)"numberOfTransferingThreads can be set only when injectRocksDBStateUploader is null.");
        this.numberOfTransferingThreads = numberOfTransferingThreads;
        return this;
    }

    RocksDBKeyedStateBackendBuilder<K> setWriteBatchSize(long writeBatchSize) {
        Preconditions.checkArgument((writeBatchSize >= 0L ? 1 : 0) != 0, (Object)"Write batch size should be non negative.");
        this.writeBatchSize = writeBatchSize;
        return this;
    }

    RocksDBKeyedStateBackendBuilder<K> setRocksDBStateUploader(RocksDBStateUploader rocksDBStateUploader) {
        Preconditions.checkState((this.injectRocksDBStateUploader == null ? 1 : 0) != 0, (Object)"rocksDBStateUploader can be only set once");
        Preconditions.checkState((this.numberOfTransferingThreads == (Integer)RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue() ? 1 : 0) != 0, (Object)"RocksDBStateUploader can only be set if numberOfTransferingThreads has not been manually set.");
        this.injectRocksDBStateUploader = rocksDBStateUploader;
        return this;
    }

    RocksDBKeyedStateBackendBuilder<K> setOverlapFractionThreshold(double overlapFractionThreshold) {
        this.overlapFractionThreshold = overlapFractionThreshold;
        return this;
    }

    RocksDBKeyedStateBackendBuilder<K> setIncrementalRestoreAsyncCompactAfterRescale(boolean incrementalRestoreAsyncCompactAfterRescale) {
        this.incrementalRestoreAsyncCompactAfterRescale = incrementalRestoreAsyncCompactAfterRescale;
        return this;
    }

    RocksDBKeyedStateBackendBuilder<K> setUseIngestDbRestoreMode(boolean useIngestDbRestoreMode) {
        this.useIngestDbRestoreMode = useIngestDbRestoreMode;
        return this;
    }

    RocksDBKeyedStateBackendBuilder<K> setRescalingUseDeleteFilesInRange(boolean rescalingUseDeleteFilesInRange) {
        this.rescalingUseDeleteFilesInRange = rescalingUseDeleteFilesInRange;
        return this;
    }

    RocksDBKeyedStateBackendBuilder<K> setIOExecutor(ExecutorService ioExecutor) {
        this.ioExecutor = ioExecutor;
        return this;
    }

    public static File getInstanceRocksDBPath(File instanceBasePath) {
        return new File(instanceBasePath, DB_INSTANCE_DIR_STRING);
    }

    private static void checkAndCreateDirectory(File directory) throws IOException {
        if (directory.exists()) {
            if (!directory.isDirectory()) {
                throw new IOException("Not a directory: " + directory);
            }
        } else if (!directory.mkdirs()) {
            throw new IOException(String.format("Could not create RocksDB data directory at %s.", directory));
        }
    }

    public RocksDBKeyedStateBackend<K> build() throws BackendBuildingException {
        PriorityQueueSetFactory priorityQueueFactory;
        RocksDBManualCompactionManager manualCompactionManager;
        SerializedCompositeKeyBuilder sharedRocksKeyBuilder;
        RocksDBWriteBatchWrapper writeBatchWrapper = null;
        ColumnFamilyHandle defaultColumnFamilyHandle = null;
        RocksDBNativeMetricMonitor nativeMetricMonitor = null;
        CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
        LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation = new LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo>();
        LinkedHashMap registeredPQStates = new LinkedHashMap();
        RocksDB db = null;
        RocksDBRestoreOperation restoreOperation = null;
        CompletableFuture asyncCompactAfterRestoreFuture = null;
        RocksDbTtlCompactFiltersManager ttlCompactFiltersManager = new RocksDbTtlCompactFiltersManager(this.ttlTimeProvider);
        ResourceGuard rocksDBResourceGuard = new ResourceGuard();
        RocksDBSnapshotStrategyBase<K, ?> checkpointStrategy = null;
        int keyGroupPrefixBytes = CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix((int)this.numberOfKeyGroups);
        try {
            UUID backendUID = UUID.randomUUID();
            TreeMap<Long, Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>> materializedSstFiles = new TreeMap();
            long lastCompletedCheckpointId = -1L;
            if (this.injectedTestDB != null) {
                db = this.injectedTestDB;
                defaultColumnFamilyHandle = this.injectedDefaultColumnFamilyHandle;
                nativeMetricMonitor = this.nativeMetricOptions.isEnabled() ? new RocksDBNativeMetricMonitor(this.nativeMetricOptions, this.metricGroup, db, null) : null;
            } else {
                this.prepareDirectories();
                restoreOperation = this.getRocksDBRestoreOperation(keyGroupPrefixBytes, this.cancelStreamRegistry, kvStateInformation, registeredPQStates, ttlCompactFiltersManager);
                RocksDBRestoreResult restoreResult = restoreOperation.restore();
                db = restoreResult.getDb();
                defaultColumnFamilyHandle = restoreResult.getDefaultColumnFamilyHandle();
                nativeMetricMonitor = restoreResult.getNativeMetricMonitor();
                asyncCompactAfterRestoreFuture = restoreResult.getAsyncCompactAfterRestoreFuture().orElse(null);
                if (restoreOperation instanceof RocksDBIncrementalRestoreOperation) {
                    backendUID = restoreResult.getBackendUID();
                    materializedSstFiles = restoreResult.getRestoredSstFiles();
                    lastCompletedCheckpointId = restoreResult.getLastCompletedCheckpointId();
                }
            }
            writeBatchWrapper = new RocksDBWriteBatchWrapper(db, this.optionsContainer.getWriteOptions(), this.writeBatchSize);
            sharedRocksKeyBuilder = new SerializedCompositeKeyBuilder(this.keySerializerProvider.currentSchemaSerializer(), keyGroupPrefixBytes, 32);
            checkpointStrategy = this.initializeSavepointAndCheckpointStrategies(rocksDBResourceGuard, kvStateInformation, keyGroupPrefixBytes, db, backendUID, materializedSstFiles, lastCompletedCheckpointId);
            manualCompactionManager = RocksDBManualCompactionManager.create(db, this.manualCompactionConfig, this.ioExecutor);
            priorityQueueFactory = this.initPriorityQueueFactory(keyGroupPrefixBytes, kvStateInformation, db, writeBatchWrapper, nativeMetricMonitor, manualCompactionManager);
        }
        catch (Throwable e) {
            ArrayList<ColumnFamilyOptions> columnFamilyOptions = new ArrayList<ColumnFamilyOptions>(kvStateInformation.values().size());
            IOUtils.closeQuietly((AutoCloseable)cancelStreamRegistryForBackend);
            IOUtils.closeQuietly(writeBatchWrapper);
            RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(columnFamilyOptions, defaultColumnFamilyHandle);
            IOUtils.closeQuietly((AutoCloseable)defaultColumnFamilyHandle);
            IOUtils.closeQuietly(nativeMetricMonitor);
            for (RocksDBKeyedStateBackend.RocksDbKvStateInfo kvStateInfo : kvStateInformation.values()) {
                RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(columnFamilyOptions, kvStateInfo.columnFamilyHandle);
                IOUtils.closeQuietly((AutoCloseable)kvStateInfo.columnFamilyHandle);
            }
            IOUtils.closeQuietly((AutoCloseable)db);
            IOUtils.closeQuietly((AutoCloseable)restoreOperation);
            IOUtils.closeAllQuietly(columnFamilyOptions);
            IOUtils.closeQuietly((AutoCloseable)this.optionsContainer);
            ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
            kvStateInformation.clear();
            IOUtils.closeQuietly(checkpointStrategy);
            try {
                FileUtils.deleteDirectory((File)this.instanceBasePath);
            }
            catch (Exception ex) {
                this.logger.warn("Failed to delete base path for RocksDB: " + this.instanceBasePath, (Throwable)ex);
            }
            if (e instanceof BackendBuildingException) {
                throw (BackendBuildingException)e;
            }
            String errMsg = "Caught unexpected exception.";
            this.logger.error(errMsg, e);
            throw new BackendBuildingException(errMsg, e);
        }
        InternalKeyContextImpl keyContext = new InternalKeyContextImpl(this.keyGroupRange, this.numberOfKeyGroups);
        this.logger.info("Finished building RocksDB keyed state-backend at {}.", (Object)this.instanceBasePath);
        return new RocksDBKeyedStateBackend<K>(this.userCodeClassLoader, this.instanceBasePath, this.optionsContainer, this.columnFamilyOptionsFactory, this.kvStateRegistry, this.keySerializerProvider.currentSchemaSerializer(), this.executionConfig, this.ttlTimeProvider, this.latencyTrackingStateConfig, db, kvStateInformation, registeredPQStates, keyGroupPrefixBytes, cancelStreamRegistryForBackend, this.keyGroupCompressionDecorator, rocksDBResourceGuard, checkpointStrategy, writeBatchWrapper, defaultColumnFamilyHandle, nativeMetricMonitor, sharedRocksKeyBuilder, priorityQueueFactory, ttlCompactFiltersManager, keyContext, this.writeBatchSize, asyncCompactAfterRestoreFuture, manualCompactionManager);
    }

    private RocksDBRestoreOperation getRocksDBRestoreOperation(int keyGroupPrefixBytes, CloseableRegistry cancelStreamRegistry, LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation, LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates, RocksDbTtlCompactFiltersManager ttlCompactFiltersManager) {
        DBOptions dbOptions = this.optionsContainer.getDbOptions();
        if (CollectionUtil.isEmptyOrAllElementsNull((Collection)this.restoreStateHandles)) {
            return new RocksDBNoneRestoreOperation(kvStateInformation, this.instanceRocksDBPath, dbOptions, this.columnFamilyOptionsFactory, this.nativeMetricOptions, this.metricGroup, ttlCompactFiltersManager, this.optionsContainer.getWriteBufferManagerCapacity());
        }
        KeyedStateHandle firstStateHandle = (KeyedStateHandle)this.restoreStateHandles.iterator().next();
        if (firstStateHandle instanceof IncrementalKeyedStateHandle) {
            return new RocksDBIncrementalRestoreOperation(this.operatorIdentifier, this.keyGroupRange, keyGroupPrefixBytes, this.numberOfTransferingThreads, cancelStreamRegistry, this.userCodeClassLoader, kvStateInformation, this.keySerializerProvider, this.instanceBasePath, this.instanceRocksDBPath, dbOptions, this.columnFamilyOptionsFactory, this.nativeMetricOptions, this.metricGroup, this.customInitializationMetrics, CollectionUtil.checkedSubTypeCast((Collection)this.restoreStateHandles, IncrementalKeyedStateHandle.class), ttlCompactFiltersManager, this.writeBatchSize, this.optionsContainer.getWriteBufferManagerCapacity(), this.overlapFractionThreshold, this.useIngestDbRestoreMode, this.incrementalRestoreAsyncCompactAfterRescale, this.rescalingUseDeleteFilesInRange, this.ioExecutor);
        }
        if (this.priorityQueueConfig.getPriorityQueueStateType() == EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP) {
            return new RocksDBHeapTimersFullRestoreOperation(this.keyGroupRange, this.numberOfKeyGroups, this.userCodeClassLoader, kvStateInformation, registeredPQStates, this.createHeapQueueFactory(), this.keySerializerProvider, this.instanceRocksDBPath, dbOptions, this.columnFamilyOptionsFactory, this.nativeMetricOptions, this.metricGroup, this.restoreStateHandles, ttlCompactFiltersManager, this.writeBatchSize, this.optionsContainer.getWriteBufferManagerCapacity());
        }
        return new RocksDBFullRestoreOperation(this.keyGroupRange, this.userCodeClassLoader, kvStateInformation, this.keySerializerProvider, this.instanceRocksDBPath, dbOptions, this.columnFamilyOptionsFactory, this.nativeMetricOptions, this.metricGroup, this.restoreStateHandles, ttlCompactFiltersManager, this.writeBatchSize, this.optionsContainer.getWriteBufferManagerCapacity());
    }

    private RocksDBSnapshotStrategyBase<K, ?> initializeSavepointAndCheckpointStrategies(ResourceGuard rocksDBResourceGuard, LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation, int keyGroupPrefixBytes, RocksDB db, UUID backendUID, SortedMap<Long, Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>> materializedSstFiles, long lastCompletedCheckpointId) {
        RocksDBStateUploader stateUploader = this.injectRocksDBStateUploader == null ? new RocksDBStateUploader(RocksDBStateDataTransferHelper.forThreadNumIfSpecified(this.numberOfTransferingThreads, this.ioExecutor)) : this.injectRocksDBStateUploader;
        RocksDBSnapshotStrategyBase checkpointSnapshotStrategy = this.enableIncrementalCheckpointing ? new RocksIncrementalSnapshotStrategy(db, rocksDBResourceGuard, this.keySerializerProvider.currentSchemaSerializer(), kvStateInformation, this.keyGroupRange, keyGroupPrefixBytes, this.localRecoveryConfig, this.instanceBasePath, backendUID, materializedSstFiles, stateUploader, lastCompletedCheckpointId) : new RocksNativeFullSnapshotStrategy(db, rocksDBResourceGuard, this.keySerializerProvider.currentSchemaSerializer(), kvStateInformation, this.keyGroupRange, keyGroupPrefixBytes, this.localRecoveryConfig, this.instanceBasePath, backendUID, stateUploader);
        return checkpointSnapshotStrategy;
    }

    private PriorityQueueSetFactory initPriorityQueueFactory(int keyGroupPrefixBytes, Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation, RocksDB db, RocksDBWriteBatchWrapper writeBatchWrapper, RocksDBNativeMetricMonitor nativeMetricMonitor, RocksDBManualCompactionManager manualCompactionManager) {
        Object priorityQueueFactory;
        switch (this.priorityQueueConfig.getPriorityQueueStateType()) {
            case HEAP: {
                priorityQueueFactory = this.createHeapQueueFactory();
                break;
            }
            case ROCKSDB: {
                priorityQueueFactory = new RocksDBPriorityQueueSetFactory(this.keyGroupRange, keyGroupPrefixBytes, this.numberOfKeyGroups, kvStateInformation, db, this.optionsContainer.getReadOptions(), writeBatchWrapper, nativeMetricMonitor, this.columnFamilyOptionsFactory, this.optionsContainer.getWriteBufferManagerCapacity(), this.priorityQueueConfig.getRocksDBPriorityQueueSetCacheSize(), manualCompactionManager);
                break;
            }
            default: {
                throw new IllegalArgumentException("Unknown priority queue state type: " + (Object)((Object)this.priorityQueueConfig.getPriorityQueueStateType()));
            }
        }
        return priorityQueueFactory;
    }

    private HeapPriorityQueueSetFactory createHeapQueueFactory() {
        return new HeapPriorityQueueSetFactory(this.keyGroupRange, this.numberOfKeyGroups, 128);
    }

    private void prepareDirectories() throws IOException {
        RocksDBKeyedStateBackendBuilder.checkAndCreateDirectory(this.instanceBasePath);
        if (this.instanceRocksDBPath.exists()) {
            FileUtils.deleteDirectory((File)this.instanceBasePath);
        }
    }

    public RocksDBKeyedStateBackendBuilder<K> setManualCompactionConfig(RocksDBManualCompactionConfig manualCompactionConfig) {
        this.manualCompactionConfig = (RocksDBManualCompactionConfig)Preconditions.checkNotNull((Object)manualCompactionConfig);
        return this;
    }
}

