/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state.snapshot;

import java.io.File;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBStateUploader;
import org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase;
import org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.SnapshotDirectory;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;
import org.rocksdb.RocksDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocksIncrementalSnapshotStrategy<K>
extends RocksDBSnapshotStrategyBase<K, RocksDBSnapshotStrategyBase.NativeRocksDBSnapshotResources> {
    private static final Logger LOG = LoggerFactory.getLogger(RocksIncrementalSnapshotStrategy.class);
    private static final String DESCRIPTION = "Asynchronous incremental RocksDB snapshot";
    @Nonnull
    private final SortedMap<Long, Map<StateHandleID, Long>> uploadedStateIDs = new TreeMap<Long, Map<StateHandleID, Long>>();
    private long lastCompletedCheckpointId;
    private final RocksDBStateUploader stateUploader;

    public RocksIncrementalSnapshotStrategy(@Nonnull RocksDB db, @Nonnull ResourceGuard rocksDBResourceGuard, @Nonnull TypeSerializer<K> keySerializer, @Nonnull LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation, @Nonnull KeyGroupRange keyGroupRange, @Nonnegative int keyGroupPrefixBytes, @Nonnull LocalRecoveryConfig localRecoveryConfig, @Nonnull CloseableRegistry cancelStreamRegistry, @Nonnull File instanceBasePath, @Nonnull UUID backendUID, @Nonnull SortedMap<Long, Map<StateHandleID, StreamStateHandle>> uploadedStateHandles, @Nonnull RocksDBStateUploader rocksDBStateUploader, long lastCompletedCheckpointId) {
        super(DESCRIPTION, db, rocksDBResourceGuard, keySerializer, kvStateInformation, keyGroupRange, keyGroupPrefixBytes, localRecoveryConfig, instanceBasePath, backendUID);
        for (Map.Entry<Long, Map<StateHandleID, StreamStateHandle>> entry : uploadedStateHandles.entrySet()) {
            HashMap<StateHandleID, Long> map = new HashMap<StateHandleID, Long>();
            for (Map.Entry<StateHandleID, StreamStateHandle> stateHandleEntry : entry.getValue().entrySet()) {
                map.put(stateHandleEntry.getKey(), stateHandleEntry.getValue().getStateSize());
            }
            this.uploadedStateIDs.put(entry.getKey(), map);
        }
        this.stateUploader = rocksDBStateUploader;
        this.lastCompletedCheckpointId = lastCompletedCheckpointId;
    }

    public SnapshotStrategy.SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(RocksDBSnapshotStrategyBase.NativeRocksDBSnapshotResources snapshotResources, long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) {
        RocksDBSnapshotStrategyBase.PreviousSnapshot previousSnapshot;
        if (snapshotResources.stateMetaInfoSnapshots.isEmpty()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", (Object)timestamp);
            }
            return registry -> SnapshotResult.empty();
        }
        SnapshotType.SharingFilesStrategy sharingFilesStrategy = checkpointOptions.getCheckpointType().getSharingFilesStrategy();
        switch (sharingFilesStrategy) {
            case FORWARD_BACKWARD: {
                previousSnapshot = snapshotResources.previousSnapshot;
                break;
            }
            case FORWARD: 
            case NO_SHARING: {
                previousSnapshot = EMPTY_PREVIOUS_SNAPSHOT;
                break;
            }
            default: {
                throw new IllegalArgumentException("Unsupported sharing files strategy: " + sharingFilesStrategy);
            }
        }
        return new RocksDBIncrementalSnapshotOperation(checkpointId, checkpointStreamFactory, snapshotResources.snapshotDirectory, previousSnapshot, sharingFilesStrategy, snapshotResources.stateMetaInfoSnapshots);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void notifyCheckpointComplete(long completedCheckpointId) {
        SortedMap<Long, Map<StateHandleID, Long>> sortedMap = this.uploadedStateIDs;
        synchronized (sortedMap) {
            if (completedCheckpointId > this.lastCompletedCheckpointId && this.uploadedStateIDs.containsKey(completedCheckpointId)) {
                this.uploadedStateIDs.keySet().removeIf(checkpointId -> checkpointId < completedCheckpointId);
                this.lastCompletedCheckpointId = completedCheckpointId;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void notifyCheckpointAborted(long abortedCheckpointId) {
        SortedMap<Long, Map<StateHandleID, Long>> sortedMap = this.uploadedStateIDs;
        synchronized (sortedMap) {
            this.uploadedStateIDs.keySet().remove(abortedCheckpointId);
        }
    }

    @Override
    public void close() {
        this.stateUploader.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected RocksDBSnapshotStrategyBase.PreviousSnapshot snapshotMetaData(long checkpointId, @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
        Map confirmedSstFiles;
        long lastCompletedCheckpoint;
        SortedMap<Long, Map<StateHandleID, Long>> sortedMap = this.uploadedStateIDs;
        synchronized (sortedMap) {
            lastCompletedCheckpoint = this.lastCompletedCheckpointId;
            confirmedSstFiles = (Map)this.uploadedStateIDs.get(lastCompletedCheckpoint);
            LOG.trace("Use confirmed SST files for checkpoint {}: {}", (Object)checkpointId, (Object)confirmedSstFiles);
        }
        LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} assuming the following (shared) confirmed files as base: {}.", new Object[]{checkpointId, lastCompletedCheckpoint, confirmedSstFiles});
        for (Map.Entry entry : this.kvStateInformation.entrySet()) {
            stateMetaInfoSnapshots.add(((RocksDBKeyedStateBackend.RocksDbKvStateInfo)entry.getValue()).metaInfo.snapshot());
        }
        return new RocksDBSnapshotStrategyBase.PreviousSnapshot(confirmedSstFiles);
    }

    private final class RocksDBIncrementalSnapshotOperation
    extends RocksDBSnapshotStrategyBase.RocksDBSnapshotOperation {
        @Nonnull
        private final RocksDBSnapshotStrategyBase.PreviousSnapshot previousSnapshot;
        @Nonnull
        private final SnapshotType.SharingFilesStrategy sharingFilesStrategy;

        private RocksDBIncrementalSnapshotOperation(@Nonnull long checkpointId, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull SnapshotDirectory localBackupDirectory, @Nonnull RocksDBSnapshotStrategyBase.PreviousSnapshot previousSnapshot, @Nonnull SnapshotType.SharingFilesStrategy sharingFilesStrategy, List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
            super(RocksIncrementalSnapshotStrategy.this, checkpointId, checkpointStreamFactory, localBackupDirectory, stateMetaInfoSnapshots);
            this.previousSnapshot = previousSnapshot;
            this.sharingFilesStrategy = sharingFilesStrategy;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableRegistry) throws Exception {
            boolean completed = false;
            SnapshotResult<StreamStateHandle> metaStateHandle = null;
            HashMap<StateHandleID, StreamStateHandle> sstFiles = new HashMap<StateHandleID, StreamStateHandle>();
            HashMap<StateHandleID, StreamStateHandle> miscFiles = new HashMap<StateHandleID, StreamStateHandle>();
            try {
                metaStateHandle = RocksIncrementalSnapshotStrategy.this.materializeMetaData(snapshotCloseableRegistry, this.tmpResourcesRegistry, this.stateMetaInfoSnapshots, this.checkpointId, this.checkpointStreamFactory);
                Preconditions.checkNotNull(metaStateHandle, (String)"Metadata was not properly created.");
                Preconditions.checkNotNull((Object)metaStateHandle.getJobManagerOwnedSnapshot(), (String)"Metadata for job manager was not properly created.");
                this.uploadSstFiles(sstFiles, miscFiles, snapshotCloseableRegistry, this.tmpResourcesRegistry);
                long checkpointedSize = metaStateHandle.getStateSize();
                checkpointedSize += RocksSnapshotUtil.getUploadedStateSize(sstFiles.values());
                IncrementalRemoteKeyedStateHandle jmIncrementalKeyedStateHandle = new IncrementalRemoteKeyedStateHandle(RocksIncrementalSnapshotStrategy.this.backendUID, RocksIncrementalSnapshotStrategy.this.keyGroupRange, this.checkpointId, sstFiles, miscFiles, (StreamStateHandle)metaStateHandle.getJobManagerOwnedSnapshot(), checkpointedSize += RocksSnapshotUtil.getUploadedStateSize(miscFiles.values()));
                Optional<KeyedStateHandle> localSnapshot = this.getLocalSnapshot((StreamStateHandle)metaStateHandle.getTaskLocalSnapshot(), sstFiles);
                SnapshotResult snapshotResult = localSnapshot.map(keyedStateHandle -> SnapshotResult.withLocalState((StateObject)jmIncrementalKeyedStateHandle, (StateObject)keyedStateHandle)).orElseGet(() -> SnapshotResult.of((StateObject)jmIncrementalKeyedStateHandle));
                completed = true;
                SnapshotResult snapshotResult2 = snapshotResult;
                return snapshotResult2;
            }
            finally {
                if (!completed) {
                    RocksIncrementalSnapshotStrategy.this.cleanupIncompleteSnapshot(this.tmpResourcesRegistry, this.localBackupDirectory);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void uploadSstFiles(@Nonnull Map<StateHandleID, StreamStateHandle> sstFiles, @Nonnull Map<StateHandleID, StreamStateHandle> miscFiles, @Nonnull CloseableRegistry snapshotCloseableRegistry, @Nonnull CloseableRegistry tmpResourcesRegistry) throws Exception {
            Preconditions.checkState((boolean)this.localBackupDirectory.exists());
            HashMap<StateHandleID, Path> sstFilePaths = new HashMap<StateHandleID, Path>();
            HashMap<StateHandleID, Path> miscFilePaths = new HashMap<StateHandleID, Path>();
            Path[] files = this.localBackupDirectory.listDirectory();
            if (files != null) {
                this.createUploadFilePaths(files, sstFiles, sstFilePaths, miscFilePaths);
                CheckpointedStateScope stateScope = this.sharingFilesStrategy == SnapshotType.SharingFilesStrategy.NO_SHARING ? CheckpointedStateScope.EXCLUSIVE : CheckpointedStateScope.SHARED;
                sstFiles.putAll(RocksIncrementalSnapshotStrategy.this.stateUploader.uploadFilesToCheckpointFs(sstFilePaths, this.checkpointStreamFactory, stateScope, snapshotCloseableRegistry, tmpResourcesRegistry));
                miscFiles.putAll(RocksIncrementalSnapshotStrategy.this.stateUploader.uploadFilesToCheckpointFs(miscFilePaths, this.checkpointStreamFactory, stateScope, snapshotCloseableRegistry, tmpResourcesRegistry));
                SortedMap sortedMap = RocksIncrementalSnapshotStrategy.this.uploadedStateIDs;
                synchronized (sortedMap) {
                    switch (this.sharingFilesStrategy) {
                        case FORWARD_BACKWARD: 
                        case FORWARD: {
                            RocksIncrementalSnapshotStrategy.this.uploadedStateIDs.put(this.checkpointId, sstFiles.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, t -> ((StreamStateHandle)t.getValue()).getStateSize())));
                            break;
                        }
                        case NO_SHARING: {
                            break;
                        }
                        default: {
                            throw new IllegalArgumentException("Unsupported sharing files strategy: " + this.sharingFilesStrategy);
                        }
                    }
                }
            }
        }

        private void createUploadFilePaths(Path[] files, Map<StateHandleID, StreamStateHandle> sstFiles, Map<StateHandleID, Path> sstFilePaths, Map<StateHandleID, Path> miscFilePaths) {
            for (Path filePath : files) {
                String fileName = filePath.getFileName().toString();
                StateHandleID stateHandleID = new StateHandleID(fileName);
                if (fileName.endsWith(".sst")) {
                    Optional<StreamStateHandle> uploaded = this.previousSnapshot.getUploaded(stateHandleID);
                    if (uploaded.isPresent()) {
                        sstFiles.put(stateHandleID, uploaded.get());
                        continue;
                    }
                    sstFilePaths.put(stateHandleID, filePath);
                    continue;
                }
                miscFilePaths.put(stateHandleID, filePath);
            }
        }
    }
}

