/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.rocksdb.snapshot;

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.DirectoryStateHandle;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.LocalSnapshotDirectoryProvider;
import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
import org.apache.flink.runtime.state.SnapshotDirectory;
import org.apache.flink.runtime.state.SnapshotResources;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.ResourceGuard;
import org.rocksdb.Checkpoint;
import org.rocksdb.RocksDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class RocksDBSnapshotStrategyBase<K, R extends SnapshotResources>
implements CheckpointListener,
SnapshotStrategy<KeyedStateHandle, NativeRocksDBSnapshotResources>,
AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(RocksDBSnapshotStrategyBase.class);
    @Nonnull
    private final String description;
    @Nonnull
    protected RocksDB db;
    @Nonnull
    protected final ResourceGuard rocksDBResourceGuard;
    @Nonnull
    protected final TypeSerializer<K> keySerializer;
    @Nonnull
    protected final LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation;
    @Nonnull
    protected final KeyGroupRange keyGroupRange;
    @Nonnegative
    protected final int keyGroupPrefixBytes;
    @Nonnull
    protected final LocalRecoveryConfig localRecoveryConfig;
    @Nonnull
    protected final File instanceBasePath;
    protected final String localDirectoryName;
    @Nonnull
    protected final UUID backendUID;
    protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT = new PreviousSnapshot(Collections.emptyList());

    public RocksDBSnapshotStrategyBase(@Nonnull String description, @Nonnull RocksDB db, @Nonnull ResourceGuard rocksDBResourceGuard, @Nonnull TypeSerializer<K> keySerializer, @Nonnull LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation, @Nonnull KeyGroupRange keyGroupRange, @Nonnegative int keyGroupPrefixBytes, @Nonnull LocalRecoveryConfig localRecoveryConfig, @Nonnull File instanceBasePath, @Nonnull UUID backendUID) {
        this.db = db;
        this.rocksDBResourceGuard = rocksDBResourceGuard;
        this.keySerializer = keySerializer;
        this.kvStateInformation = kvStateInformation;
        this.keyGroupRange = keyGroupRange;
        this.keyGroupPrefixBytes = keyGroupPrefixBytes;
        this.localRecoveryConfig = localRecoveryConfig;
        this.description = description;
        this.instanceBasePath = instanceBasePath;
        this.localDirectoryName = backendUID.toString().replaceAll("[\\-]", "");
        this.backendUID = backendUID;
    }

    @Nonnull
    public String getDescription() {
        return this.description;
    }

    public NativeRocksDBSnapshotResources syncPrepareResources(long checkpointId) throws Exception {
        SnapshotDirectory snapshotDirectory = this.prepareLocalSnapshotDirectory(checkpointId);
        LOG.trace("Local RocksDB checkpoint goes to backup path {}.", (Object)snapshotDirectory);
        ArrayList<StateMetaInfoSnapshot> stateMetaInfoSnapshots = new ArrayList<StateMetaInfoSnapshot>(this.kvStateInformation.size());
        PreviousSnapshot previousSnapshot = this.snapshotMetaData(checkpointId, stateMetaInfoSnapshots);
        this.takeDBNativeCheckpoint(snapshotDirectory);
        return new NativeRocksDBSnapshotResources(snapshotDirectory, previousSnapshot, stateMetaInfoSnapshots);
    }

    protected abstract PreviousSnapshot snapshotMetaData(long var1, @Nonnull List<StateMetaInfoSnapshot> var3);

    private void takeDBNativeCheckpoint(@Nonnull SnapshotDirectory outputDirectory) throws Exception {
        try (ResourceGuard.Lease ignored = this.rocksDBResourceGuard.acquireResource();
             Checkpoint checkpoint = Checkpoint.create((RocksDB)this.db);){
            checkpoint.createCheckpoint(outputDirectory.getDirectory().toString());
        }
        catch (Exception ex) {
            try {
                outputDirectory.cleanup();
            }
            catch (IOException cleanupEx) {
                ex = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)cleanupEx, (Throwable)ex);
            }
            throw ex;
        }
    }

    @Nonnull
    protected SnapshotDirectory prepareLocalSnapshotDirectory(long checkpointId) throws IOException {
        if (this.localRecoveryConfig.isLocalBackupEnabled()) {
            LocalSnapshotDirectoryProvider directoryProvider = (LocalSnapshotDirectoryProvider)this.localRecoveryConfig.getLocalStateDirectoryProvider().orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled());
            File directory = directoryProvider.subtaskSpecificCheckpointDirectory(checkpointId);
            if (!directory.exists() && !directory.mkdirs()) {
                throw new IOException("Local state base directory for checkpoint " + checkpointId + " does not exist and could not be created: " + String.valueOf(directory));
            }
            File rdbSnapshotDir = new File(directory, this.localDirectoryName);
            if (rdbSnapshotDir.exists()) {
                FileUtils.deleteDirectory((File)rdbSnapshotDir);
            }
            Path path = rdbSnapshotDir.toPath();
            try {
                return SnapshotDirectory.permanent((Path)path);
            }
            catch (IOException ex) {
                try {
                    FileUtils.deleteDirectory((File)directory);
                }
                catch (IOException delEx) {
                    ex = (IOException)ExceptionUtils.firstOrSuppressed((Throwable)delEx, (Throwable)ex);
                }
                throw ex;
            }
        }
        File snapshotDir = new File(this.instanceBasePath, "chk-" + checkpointId);
        return SnapshotDirectory.temporary((File)snapshotDir);
    }

    protected void cleanupIncompleteSnapshot(@Nonnull CloseableRegistry tmpResourcesRegistry, @Nonnull SnapshotDirectory localBackupDirectory) {
        try {
            tmpResourcesRegistry.close();
        }
        catch (Exception e) {
            LOG.warn("Could not properly clean tmp resources.", (Throwable)e);
        }
        if (localBackupDirectory.isSnapshotCompleted()) {
            try {
                DirectoryStateHandle directoryStateHandle = localBackupDirectory.completeSnapshotAndGetHandle();
                if (directoryStateHandle != null) {
                    directoryStateHandle.discardState();
                }
            }
            catch (Exception e) {
                LOG.warn("Could not properly discard local state.", (Throwable)e);
            }
        }
    }

    @Nonnull
    protected SnapshotResult<StreamStateHandle> materializeMetaData(@Nonnull CloseableRegistry snapshotCloseableRegistry, @Nonnull CloseableRegistry tmpResourcesRegistry, @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots, long checkpointId, @Nonnull CheckpointStreamFactory checkpointStreamFactory) throws Exception {
        CheckpointStreamWithResultProvider streamWithResultProvider = this.localRecoveryConfig.isLocalBackupEnabled() ? CheckpointStreamWithResultProvider.createDuplicatingStream((long)checkpointId, (CheckpointedStateScope)CheckpointedStateScope.EXCLUSIVE, (CheckpointStreamFactory)checkpointStreamFactory, (LocalSnapshotDirectoryProvider)((LocalSnapshotDirectoryProvider)this.localRecoveryConfig.getLocalStateDirectoryProvider().orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled()))) : CheckpointStreamWithResultProvider.createSimpleStream((CheckpointedStateScope)CheckpointedStateScope.EXCLUSIVE, (CheckpointStreamFactory)checkpointStreamFactory);
        snapshotCloseableRegistry.registerCloseable((AutoCloseable)streamWithResultProvider);
        try {
            KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy(this.keySerializer, stateMetaInfoSnapshots, false);
            DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper((OutputStream)streamWithResultProvider.getCheckpointOutputStream());
            serializationProxy.write((DataOutputView)out);
            if (snapshotCloseableRegistry.unregisterCloseable((AutoCloseable)streamWithResultProvider)) {
                SnapshotResult result = streamWithResultProvider.closeAndFinalizeCheckpointStreamResult();
                streamWithResultProvider = null;
                tmpResourcesRegistry.registerCloseable(() -> StateUtil.discardStateObjectQuietly((StateObject)result));
                SnapshotResult snapshotResult = result;
                return snapshotResult;
            }
            throw new IOException("Stream already closed and cannot return a handle.");
        }
        finally {
            if (snapshotCloseableRegistry.unregisterCloseable((AutoCloseable)streamWithResultProvider)) {
                IOUtils.closeQuietly((AutoCloseable)streamWithResultProvider);
            }
        }
    }

    @Override
    public abstract void close() throws IOException;

    protected static class PreviousSnapshot {
        @Nonnull
        private final Map<String, StreamStateHandle> confirmedSstFiles;

        protected PreviousSnapshot(@Nullable Collection<IncrementalKeyedStateHandle.HandleAndLocalPath> confirmedSstFiles) {
            this.confirmedSstFiles = confirmedSstFiles != null ? confirmedSstFiles.stream().collect(Collectors.toMap(IncrementalKeyedStateHandle.HandleAndLocalPath::getLocalPath, IncrementalKeyedStateHandle.HandleAndLocalPath::getHandle)) : Collections.emptyMap();
        }

        protected Optional<StreamStateHandle> getUploaded(String filename) {
            if (this.confirmedSstFiles.containsKey(filename)) {
                StreamStateHandle handle = this.confirmedSstFiles.get(filename);
                return Optional.of(new PlaceholderStreamStateHandle(handle.getStreamStateHandleID(), handle.getStateSize(), FileMergingSnapshotManager.isFileMergingHandle((StreamStateHandle)handle)));
            }
            return Optional.empty();
        }
    }

    protected static class NativeRocksDBSnapshotResources
    implements SnapshotResources {
        @Nonnull
        protected final SnapshotDirectory snapshotDirectory;
        @Nonnull
        protected final PreviousSnapshot previousSnapshot;
        @Nonnull
        protected final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;

        public NativeRocksDBSnapshotResources(SnapshotDirectory snapshotDirectory, PreviousSnapshot previousSnapshot, List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
            this.snapshotDirectory = snapshotDirectory;
            this.previousSnapshot = previousSnapshot;
            this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
        }

        public void release() {
            try {
                if (this.snapshotDirectory.exists()) {
                    LOG.trace("Running cleanup for local RocksDB backup directory {}.", (Object)this.snapshotDirectory);
                    boolean cleanupOk = this.snapshotDirectory.cleanup();
                    if (!cleanupOk) {
                        LOG.debug("Could not properly cleanup local RocksDB backup directory.");
                    }
                }
            }
            catch (IOException e) {
                LOG.warn("Could not properly cleanup local RocksDB backup directory.", (Throwable)e);
            }
        }
    }

    protected abstract class RocksDBSnapshotOperation
    implements SnapshotStrategy.SnapshotResultSupplier<KeyedStateHandle> {
        protected final long checkpointId;
        @Nonnull
        protected final CheckpointStreamFactory checkpointStreamFactory;
        @Nonnull
        protected final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
        @Nonnull
        protected final SnapshotDirectory localBackupDirectory;
        @Nonnull
        protected final CloseableRegistry tmpResourcesRegistry;

        protected RocksDBSnapshotOperation(@Nonnull long checkpointId, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull SnapshotDirectory localBackupDirectory, List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
            this.checkpointId = checkpointId;
            this.checkpointStreamFactory = checkpointStreamFactory;
            this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
            this.localBackupDirectory = localBackupDirectory;
            this.tmpResourcesRegistry = new CloseableRegistry();
        }

        protected Optional<KeyedStateHandle> getLocalSnapshot(@Nullable StreamStateHandle localStreamStateHandle, List<IncrementalKeyedStateHandle.HandleAndLocalPath> sharedState) throws IOException {
            DirectoryStateHandle directoryStateHandle = this.localBackupDirectory.completeSnapshotAndGetHandle();
            if (directoryStateHandle != null && localStreamStateHandle != null) {
                return Optional.of(new IncrementalLocalKeyedStateHandle(RocksDBSnapshotStrategyBase.this.backendUID, this.checkpointId, directoryStateHandle, RocksDBSnapshotStrategyBase.this.keyGroupRange, localStreamStateHandle, sharedState));
            }
            return Optional.empty();
        }
    }
}

