/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.heap;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.RunnableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.HeapPriorityQueuesManager;
import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.SavepointResources;
import org.apache.flink.runtime.state.SnapshotExecutionType;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.SnapshotStrategyRunner;
import org.apache.flink.runtime.state.StateSnapshotRestore;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.StateSnapshotTransformers;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.heap.HeapAggregatingState;
import org.apache.flink.runtime.state.heap.HeapListState;
import org.apache.flink.runtime.state.heap.HeapMapState;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
import org.apache.flink.runtime.state.heap.HeapReducingState;
import org.apache.flink.runtime.state.heap.HeapSnapshotResources;
import org.apache.flink.runtime.state.heap.HeapSnapshotStrategy;
import org.apache.flink.runtime.state.heap.HeapValueState;
import org.apache.flink.runtime.state.heap.StateTable;
import org.apache.flink.runtime.state.heap.StateTableFactory;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.StateMigrationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HeapKeyedStateBackend<K>
extends AbstractKeyedStateBackend<K> {
    private static final Logger LOG = LoggerFactory.getLogger(HeapKeyedStateBackend.class);
    private static final Map<StateDescriptor.Type, StateCreateFactory> STATE_CREATE_FACTORIES = Stream.of(Tuple2.of(StateDescriptor.Type.VALUE, HeapValueState::create), Tuple2.of(StateDescriptor.Type.LIST, HeapListState::create), Tuple2.of(StateDescriptor.Type.MAP, HeapMapState::create), Tuple2.of(StateDescriptor.Type.AGGREGATING, HeapAggregatingState::create), Tuple2.of(StateDescriptor.Type.REDUCING, HeapReducingState::create)).collect(Collectors.toMap(t -> (StateDescriptor.Type)((Object)((Object)t.f0)), t -> (StateCreateFactory)t.f1));
    private static final Map<StateDescriptor.Type, StateUpdateFactory> STATE_UPDATE_FACTORIES = Stream.of(Tuple2.of(StateDescriptor.Type.VALUE, HeapValueState::update), Tuple2.of(StateDescriptor.Type.LIST, HeapListState::update), Tuple2.of(StateDescriptor.Type.MAP, HeapMapState::update), Tuple2.of(StateDescriptor.Type.AGGREGATING, HeapAggregatingState::update), Tuple2.of(StateDescriptor.Type.REDUCING, HeapReducingState::update)).collect(Collectors.toMap(t -> (StateDescriptor.Type)((Object)((Object)t.f0)), t -> (StateUpdateFactory)t.f1));
    private final Map<String, State> createdKVStates;
    private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
    private final LocalRecoveryConfig localRecoveryConfig;
    private final SnapshotStrategy<KeyedStateHandle, ?> checkpointStrategy;
    private final SnapshotExecutionType snapshotExecutionType;
    private final StateTableFactory<K> stateTableFactory;
    private final HeapPriorityQueuesManager priorityQueuesManager;

    public HeapKeyedStateBackend(TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, LatencyTrackingStateConfig latencyTrackingStateConfig, CloseableRegistry cancelStreamRegistry, StreamCompressionDecorator keyGroupCompressionDecorator, Map<String, StateTable<K, ?, ?>> registeredKVStates, Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates, LocalRecoveryConfig localRecoveryConfig, HeapPriorityQueueSetFactory priorityQueueSetFactory, HeapSnapshotStrategy<K> checkpointStrategy, SnapshotExecutionType snapshotExecutionType, StateTableFactory<K> stateTableFactory, InternalKeyContext<K> keyContext) {
        super(kvStateRegistry, keySerializer, userCodeClassLoader, executionConfig, ttlTimeProvider, latencyTrackingStateConfig, cancelStreamRegistry, keyGroupCompressionDecorator, keyContext);
        this.registeredKVStates = registeredKVStates;
        this.createdKVStates = new HashMap<String, State>();
        this.localRecoveryConfig = localRecoveryConfig;
        this.checkpointStrategy = checkpointStrategy;
        this.snapshotExecutionType = snapshotExecutionType;
        this.stateTableFactory = stateTableFactory;
        this.priorityQueuesManager = new HeapPriorityQueuesManager(registeredPQStates, priorityQueueSetFactory, keyContext.getKeyGroupRange(), keyContext.getNumberOfKeyGroups());
        LOG.info("Initializing heap keyed state backend with stream factory.");
    }

    @Override
    @Nonnull
    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
        return this.priorityQueuesManager.createOrUpdate(stateName, byteOrderedElementSerializer);
    }

    @Override
    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer, boolean allowFutureMetadataUpdates) {
        return this.priorityQueuesManager.createOrUpdate(stateName, byteOrderedElementSerializer, allowFutureMetadataUpdates);
    }

    private <N, V> StateTable<K, N, V> tryRegisterStateTable(TypeSerializer<N> namespaceSerializer, StateDescriptor<?, V> stateDesc, @Nonnull StateSnapshotTransformer.StateSnapshotTransformFactory<V> snapshotTransformFactory, boolean allowFutureMetadataUpdates) throws StateMigrationException {
        StateTable<K, Object, Object> stateTable = this.registeredKVStates.get(stateDesc.getName());
        TypeSerializer<V> newStateSerializer = stateDesc.getSerializer();
        if (stateTable != null) {
            RegisteredKeyValueStateBackendMetaInfo<?, ?> restoredKvMetaInfo = stateTable.getMetaInfo();
            restoredKvMetaInfo.updateSnapshotTransformFactory(snapshotTransformFactory);
            TypeSerializer<?> previousNamespaceSerializer = restoredKvMetaInfo.getNamespaceSerializer();
            TypeSerializerSchemaCompatibility<?> namespaceCompatibility = restoredKvMetaInfo.updateNamespaceSerializer(namespaceSerializer);
            if (namespaceCompatibility.isCompatibleAfterMigration() || namespaceCompatibility.isIncompatible()) {
                throw new StateMigrationException("For heap backends, the new namespace serializer (" + namespaceSerializer + ") must be compatible with the old namespace serializer (" + previousNamespaceSerializer + ").");
            }
            restoredKvMetaInfo.checkStateMetaInfo(stateDesc);
            TypeSerializer<?> previousStateSerializer = restoredKvMetaInfo.getStateSerializer();
            TypeSerializerSchemaCompatibility<?> stateCompatibility = restoredKvMetaInfo.updateStateSerializer(newStateSerializer);
            if (stateCompatibility.isIncompatible()) {
                throw new StateMigrationException("For heap backends, the new state serializer (" + newStateSerializer + ") must not be incompatible with the old state serializer (" + previousStateSerializer + ").");
            }
            restoredKvMetaInfo = allowFutureMetadataUpdates ? restoredKvMetaInfo.withSerializerUpgradesAllowed() : restoredKvMetaInfo;
            stateTable.setMetaInfo(restoredKvMetaInfo);
        } else {
            RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<N, V>(stateDesc.getType(), stateDesc.getName(), namespaceSerializer, newStateSerializer, snapshotTransformFactory);
            newMetaInfo = allowFutureMetadataUpdates ? newMetaInfo.withSerializerUpgradesAllowed() : newMetaInfo;
            stateTable = this.stateTableFactory.newStateTable(this.keyContext, newMetaInfo, this.keySerializer);
            this.registeredKVStates.put(stateDesc.getName(), stateTable);
        }
        return stateTable;
    }

    @Override
    public <N> Stream<K> getKeys(String state, N namespace) {
        if (!this.registeredKVStates.containsKey(state)) {
            return Stream.empty();
        }
        StateSnapshotRestore stateSnapshotRestore = this.registeredKVStates.get(state);
        StateTable table = (StateTable)stateSnapshotRestore;
        return table.getKeys(namespace);
    }

    @Override
    public <N> Stream<Tuple2<K, N>> getKeysAndNamespaces(String state) {
        if (!this.registeredKVStates.containsKey(state)) {
            return Stream.empty();
        }
        StateSnapshotRestore stateSnapshotRestore = this.registeredKVStates.get(state);
        StateTable table = (StateTable)stateSnapshotRestore;
        return table.getKeysAndNamespaces();
    }

    @Override
    @Nonnull
    public <N, SV, SEV, S extends State, IS extends S> IS createOrUpdateInternalState(@Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<S, SV> stateDesc, @Nonnull StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception {
        return this.createOrUpdateInternalState(namespaceSerializer, stateDesc, snapshotTransformFactory, false);
    }

    @Override
    @Nonnull
    public <N, SV, SEV, S extends State, IS extends S> IS createOrUpdateInternalState(@Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<S, SV> stateDesc, @Nonnull StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory, boolean allowFutureMetadataUpdates) throws Exception {
        StateTable<K, N, SV> stateTable = this.tryRegisterStateTable(namespaceSerializer, stateDesc, this.getStateSnapshotTransformFactory(stateDesc, snapshotTransformFactory), allowFutureMetadataUpdates);
        State createdState = this.createdKVStates.get(stateDesc.getName());
        if (createdState == null) {
            StateCreateFactory stateCreateFactory = STATE_CREATE_FACTORIES.get((Object)stateDesc.getType());
            if (stateCreateFactory == null) {
                throw new FlinkRuntimeException(this.stateNotSupportedMessage(stateDesc));
            }
            createdState = stateCreateFactory.createState(stateDesc, stateTable, this.getKeySerializer());
        } else {
            StateUpdateFactory stateUpdateFactory = STATE_UPDATE_FACTORIES.get((Object)stateDesc.getType());
            if (stateUpdateFactory == null) {
                throw new FlinkRuntimeException(this.stateNotSupportedMessage(stateDesc));
            }
            createdState = stateUpdateFactory.updateState(stateDesc, stateTable, createdState);
        }
        this.createdKVStates.put(stateDesc.getName(), createdState);
        return (IS)createdState;
    }

    private <S extends State, SV> String stateNotSupportedMessage(StateDescriptor<S, SV> stateDesc) {
        return String.format("State %s is not supported by %s", stateDesc.getClass(), this.getClass());
    }

    private <SV, SEV> StateSnapshotTransformer.StateSnapshotTransformFactory<SV> getStateSnapshotTransformFactory(StateDescriptor<?, SV> stateDesc, StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory) {
        if (stateDesc instanceof ListStateDescriptor) {
            return new StateSnapshotTransformers.ListStateSnapshotTransformFactory<SEV>(snapshotTransformFactory);
        }
        if (stateDesc instanceof MapStateDescriptor) {
            return new StateSnapshotTransformers.MapStateSnapshotTransformFactory(snapshotTransformFactory);
        }
        return snapshotTransformFactory;
    }

    @Override
    @Nonnull
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception {
        SnapshotStrategyRunner snapshotStrategyRunner = new SnapshotStrategyRunner("Heap backend snapshot", this.checkpointStrategy, this.cancelStreamRegistry, this.snapshotExecutionType);
        return snapshotStrategyRunner.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
    }

    @Override
    @Nonnull
    public SavepointResources<K> savepoint() {
        HeapSnapshotResources<K> snapshotResources = HeapSnapshotResources.create(this.registeredKVStates, this.priorityQueuesManager.getRegisteredPQStates(), this.keyGroupCompressionDecorator, this.keyGroupRange, this.keySerializer, this.numberOfKeyGroups);
        return new SavepointResources<K>(snapshotResources, this.snapshotExecutionType);
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) {
    }

    @Override
    public void notifyCheckpointAborted(long checkpointId) {
    }

    @Override
    public <N, S extends State, T> void applyToAllKeys(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, T> stateDescriptor, KeyedStateFunction<K, S> function, AbstractKeyedStateBackend.PartitionStateFactory partitionStateFactory) throws Exception {
        try (Stream<K> keyStream = this.getKeys(stateDescriptor.getName(), namespace);){
            List keys = keyStream.collect(Collectors.toList());
            S state = partitionStateFactory.get(namespace, namespaceSerializer, stateDescriptor);
            for (Object key : keys) {
                this.setCurrentKey(key);
                function.process(key, state);
            }
        }
    }

    public String toString() {
        return "HeapKeyedStateBackend";
    }

    @Override
    @VisibleForTesting
    public int numKeyValueStateEntries() {
        int sum = 0;
        for (StateSnapshotRestore stateSnapshotRestore : this.registeredKVStates.values()) {
            sum += ((StateTable)stateSnapshotRestore).size();
        }
        return sum;
    }

    @VisibleForTesting
    public int numKeyValueStateEntries(Object namespace) {
        int sum = 0;
        for (StateTable<K, ?, ?> state : this.registeredKVStates.values()) {
            sum += state.sizeOfNamespace(namespace);
        }
        return sum;
    }

    @VisibleForTesting
    public LocalRecoveryConfig getLocalRecoveryConfig() {
        return this.localRecoveryConfig;
    }

    private static interface StateUpdateFactory {
        public <K, N, SV, S extends State, IS extends S> IS updateState(StateDescriptor<S, SV> var1, StateTable<K, N, SV> var2, IS var3) throws Exception;
    }

    private static interface StateCreateFactory {
        public <K, N, SV, S extends State, IS extends S> IS createState(StateDescriptor<S, SV> var1, StateTable<K, N, SV> var2, TypeSerializer<K> var3) throws Exception;
    }
}

