/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.heap;

import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.StateSerializerProvider;
import org.apache.flink.runtime.state.StateSnapshot;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
import org.apache.flink.runtime.state.heap.HeapSnapshotResources;
import org.apache.flink.runtime.state.heap.StateTable;
import org.apache.flink.runtime.state.heap.StateUID;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.function.SupplierWithException;

class HeapSnapshotStrategy<K>
implements SnapshotStrategy<KeyedStateHandle, HeapSnapshotResources<K>> {
    private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
    private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates;
    private final StreamCompressionDecorator keyGroupCompressionDecorator;
    private final LocalRecoveryConfig localRecoveryConfig;
    private final KeyGroupRange keyGroupRange;
    private final StateSerializerProvider<K> keySerializerProvider;
    private final int totalKeyGroups;

    HeapSnapshotStrategy(Map<String, StateTable<K, ?, ?>> registeredKVStates, Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates, StreamCompressionDecorator keyGroupCompressionDecorator, LocalRecoveryConfig localRecoveryConfig, KeyGroupRange keyGroupRange, StateSerializerProvider<K> keySerializerProvider, int totalKeyGroups) {
        this.registeredKVStates = registeredKVStates;
        this.registeredPQStates = registeredPQStates;
        this.keyGroupCompressionDecorator = keyGroupCompressionDecorator;
        this.localRecoveryConfig = localRecoveryConfig;
        this.keyGroupRange = keyGroupRange;
        this.keySerializerProvider = keySerializerProvider;
        this.totalKeyGroups = totalKeyGroups;
    }

    @Override
    public HeapSnapshotResources<K> syncPrepareResources(long checkpointId) {
        return HeapSnapshotResources.create(this.registeredKVStates, this.registeredPQStates, this.keyGroupCompressionDecorator, this.keyGroupRange, this.getKeySerializer(), this.totalKeyGroups);
    }

    @Override
    public SnapshotStrategy.SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(HeapSnapshotResources<K> syncPartResource, long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) {
        List<StateMetaInfoSnapshot> metaInfoSnapshots = syncPartResource.getMetaInfoSnapshots();
        if (metaInfoSnapshots.isEmpty()) {
            return snapshotCloseableRegistry -> SnapshotResult.empty();
        }
        KeyedBackendSerializationProxy<K> serializationProxy = new KeyedBackendSerializationProxy<K>(syncPartResource.getKeySerializer(), metaInfoSnapshots, !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, this.keyGroupCompressionDecorator));
        SupplierWithException checkpointStreamSupplier = this.localRecoveryConfig.isLocalBackupEnabled() && !checkpointOptions.getCheckpointType().isSavepoint() ? () -> CheckpointStreamWithResultProvider.createDuplicatingStream(checkpointId, CheckpointedStateScope.EXCLUSIVE, streamFactory, this.localRecoveryConfig.getLocalStateDirectoryProvider().orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled())) : () -> CheckpointStreamWithResultProvider.createSimpleStream(CheckpointedStateScope.EXCLUSIVE, streamFactory);
        return snapshotCloseableRegistry -> {
            Map<StateUID, Integer> stateNamesToId = syncPartResource.getStateNamesToId();
            Map<StateUID, StateSnapshot> cowStateStableSnapshots = syncPartResource.getCowStateStableSnapshots();
            CheckpointStreamWithResultProvider streamWithResultProvider = (CheckpointStreamWithResultProvider)checkpointStreamSupplier.get();
            snapshotCloseableRegistry.registerCloseable(streamWithResultProvider);
            CheckpointStateOutputStream localStream = streamWithResultProvider.getCheckpointOutputStream();
            DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(localStream);
            serializationProxy.write(outView);
            long[] keyGroupRangeOffsets = new long[this.keyGroupRange.getNumberOfKeyGroups()];
            for (int keyGroupPos = 0; keyGroupPos < this.keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) {
                int keyGroupId = this.keyGroupRange.getKeyGroupId(keyGroupPos);
                keyGroupRangeOffsets[keyGroupPos] = localStream.getPos();
                outView.writeInt(keyGroupId);
                for (Map.Entry<StateUID, StateSnapshot> stateSnapshot : cowStateStableSnapshots.entrySet()) {
                    StateSnapshot.StateKeyGroupWriter partitionedSnapshot = stateSnapshot.getValue().getKeyGroupWriter();
                    OutputStream kgCompressionOut = this.keyGroupCompressionDecorator.decorateWithCompression(localStream);
                    Throwable throwable = null;
                    try {
                        DataOutputViewStreamWrapper kgCompressionView = new DataOutputViewStreamWrapper(kgCompressionOut);
                        kgCompressionView.writeShort(stateNamesToId.get(stateSnapshot.getKey()));
                        partitionedSnapshot.writeStateInKeyGroup(kgCompressionView, keyGroupId);
                    }
                    catch (Throwable throwable2) {
                        throwable = throwable2;
                        throw throwable2;
                    }
                    finally {
                        if (kgCompressionOut == null) continue;
                        if (throwable != null) {
                            try {
                                kgCompressionOut.close();
                            }
                            catch (Throwable throwable3) {
                                throwable.addSuppressed(throwable3);
                            }
                            continue;
                        }
                        kgCompressionOut.close();
                    }
                }
            }
            if (snapshotCloseableRegistry.unregisterCloseable(streamWithResultProvider)) {
                KeyGroupRangeOffsets kgOffs = new KeyGroupRangeOffsets(this.keyGroupRange, keyGroupRangeOffsets);
                SnapshotResult<StreamStateHandle> result = streamWithResultProvider.closeAndFinalizeCheckpointStreamResult();
                return CheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult(result, kgOffs, KeyGroupsStateHandle::new);
            }
            throw new IOException("Stream already unregistered.");
        };
    }

    public TypeSerializer<K> getKeySerializer() {
        return this.keySerializerProvider.currentSchemaSerializer();
    }
}

