/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.changelog;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.SavepointResources;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.TestableKeyedStateBackend;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.InternalKeyContext;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateFactory;
import org.apache.flink.runtime.state.ttl.TtlStateFactory;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.state.changelog.ChangelogAggregatingState;
import org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue;
import org.apache.flink.state.changelog.ChangelogListState;
import org.apache.flink.state.changelog.ChangelogMapState;
import org.apache.flink.state.changelog.ChangelogReducingState;
import org.apache.flink.state.changelog.ChangelogState;
import org.apache.flink.state.changelog.ChangelogValueState;
import org.apache.flink.state.changelog.KvStateChangeLogger;
import org.apache.flink.state.changelog.KvStateChangeLoggerImpl;
import org.apache.flink.state.changelog.PriorityQueueStateChangeLoggerImpl;
import org.apache.flink.state.changelog.restore.FunctionDelegationHelper;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class ChangelogKeyedStateBackend<K>
implements CheckpointableKeyedStateBackend<K>,
CheckpointListener,
TestableKeyedStateBackend<K> {
    private static final Logger LOG = LoggerFactory.getLogger(ChangelogKeyedStateBackend.class);
    private static final Map<StateDescriptor.Type, StateFactory> STATE_FACTORIES = Stream.of(Tuple2.of((Object)StateDescriptor.Type.VALUE, ChangelogValueState::create), Tuple2.of((Object)StateDescriptor.Type.LIST, ChangelogListState::create), Tuple2.of((Object)StateDescriptor.Type.REDUCING, ChangelogReducingState::create), Tuple2.of((Object)StateDescriptor.Type.AGGREGATING, ChangelogAggregatingState::create), Tuple2.of((Object)StateDescriptor.Type.MAP, ChangelogMapState::create)).collect(Collectors.toMap(t -> (StateDescriptor.Type)t.f0, t -> (StateFactory)t.f1));
    private final AbstractKeyedStateBackend<K> keyedStateBackend;
    private final HashMap<String, InternalKvState<K, ?, ?>> keyValueStatesByName;
    private final HashMap<String, ChangelogState> changelogStates;
    private final HashMap<String, ChangelogKeyGroupedPriorityQueue<?>> priorityQueueStatesByName;
    private final ExecutionConfig executionConfig;
    private final TtlTimeProvider ttlTimeProvider;
    private final StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter;
    private long lastCheckpointId = -1L;
    private InternalKvState lastState;
    private String lastName;
    private final FunctionDelegationHelper functionDelegationHelper = new FunctionDelegationHelper();
    @GuardedBy(value="materialized")
    private final List<KeyedStateHandle> materialized = new ArrayList<KeyedStateHandle>();
    @GuardedBy(value="materialized")
    private final List<ChangelogStateHandle> restoredNonMaterialized = new ArrayList<ChangelogStateHandle>();
    @Nullable
    private SequenceNumber lastUploadedFrom;
    @Nullable
    private SequenceNumber lastUploadedTo;
    private final SequenceNumber materializedTo;
    private final MailboxExecutor mainMailboxExecutor;
    private final ExecutorService asyncOperationsThreadPool;

    public ChangelogKeyedStateBackend(AbstractKeyedStateBackend<K> keyedStateBackend, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter, Collection<ChangelogStateBackendHandle> initialState, MailboxExecutor mainMailboxExecutor, ExecutorService asyncOperationsThreadPool) {
        this.keyedStateBackend = keyedStateBackend;
        this.executionConfig = executionConfig;
        this.ttlTimeProvider = ttlTimeProvider;
        this.keyValueStatesByName = new HashMap();
        this.priorityQueueStatesByName = new HashMap();
        this.stateChangelogWriter = stateChangelogWriter;
        this.materializedTo = stateChangelogWriter.initialSequenceNumber();
        this.changelogStates = new HashMap();
        this.mainMailboxExecutor = (MailboxExecutor)Preconditions.checkNotNull((Object)mainMailboxExecutor);
        this.asyncOperationsThreadPool = (ExecutorService)Preconditions.checkNotNull((Object)asyncOperationsThreadPool);
        this.completeRestore(initialState);
    }

    public KeyGroupRange getKeyGroupRange() {
        return this.keyedStateBackend.getKeyGroupRange();
    }

    public void close() throws IOException {
        this.keyedStateBackend.close();
    }

    public void setCurrentKey(K newKey) {
        this.keyedStateBackend.setCurrentKey(newKey);
    }

    public K getCurrentKey() {
        return (K)this.keyedStateBackend.getCurrentKey();
    }

    public TypeSerializer<K> getKeySerializer() {
        return this.keyedStateBackend.getKeySerializer();
    }

    public <N> Stream<K> getKeys(String state, N namespace) {
        return this.keyedStateBackend.getKeys(state, namespace);
    }

    public <N> Stream<Tuple2<K, N>> getKeysAndNamespaces(String state) {
        return this.keyedStateBackend.getKeysAndNamespaces(state);
    }

    public void dispose() {
        this.keyedStateBackend.dispose();
        this.lastName = null;
        this.lastState = null;
        this.keyValueStatesByName.clear();
    }

    public void registerKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> listener) {
        this.keyedStateBackend.registerKeySelectionListener(listener);
    }

    public boolean deregisterKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> listener) {
        return this.keyedStateBackend.deregisterKeySelectionListener(listener);
    }

    public <N, S extends State, T> void applyToAllKeys(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, T> stateDescriptor, KeyedStateFunction<K, S> function) throws Exception {
        this.keyedStateBackend.applyToAllKeys(namespace, namespaceSerializer, stateDescriptor, function, this::getPartitionedState);
    }

    public <N, S extends State> S getPartitionedState(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception {
        Preconditions.checkNotNull(namespace, (String)"Namespace");
        if (this.lastName != null && this.lastName.equals(stateDescriptor.getName())) {
            this.lastState.setCurrentNamespace(namespace);
            return (S)this.lastState;
        }
        InternalKvState<K, ?, ?> previous = this.keyValueStatesByName.get(stateDescriptor.getName());
        if (previous != null) {
            this.lastState = previous;
            this.lastState.setCurrentNamespace(namespace);
            this.lastName = stateDescriptor.getName();
            this.functionDelegationHelper.addOrUpdate(stateDescriptor);
            return (S)previous;
        }
        S state = this.getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
        InternalKvState kvState = (InternalKvState)state;
        this.lastName = stateDescriptor.getName();
        this.lastState = kvState;
        kvState.setCurrentNamespace(namespace);
        return state;
    }

    @Nonnull
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception {
        this.lastCheckpointId = checkpointId;
        this.lastUploadedFrom = this.materializedTo;
        this.lastUploadedTo = this.stateChangelogWriter.lastAppendedSequenceNumber().next();
        LOG.debug("snapshot for checkpoint {}, change range: {}..{}", new Object[]{checkpointId, this.lastUploadedFrom, this.lastUploadedTo});
        return ChangelogKeyedStateBackend.toRunnableFuture(this.stateChangelogWriter.persist(this.lastUploadedFrom).thenApply(this::buildSnapshotResult));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private SnapshotResult<KeyedStateHandle> buildSnapshotResult(ChangelogStateHandle delta) {
        List<KeyedStateHandle> list = this.materialized;
        synchronized (list) {
            ArrayList<ChangelogStateHandle> prevDeltaCopy = new ArrayList<ChangelogStateHandle>(this.restoredNonMaterialized);
            if (delta != null && delta.getStateSize() > 0L) {
                prevDeltaCopy.add(delta);
            }
            if (prevDeltaCopy.isEmpty() && this.materialized.isEmpty()) {
                return SnapshotResult.empty();
            }
            return SnapshotResult.of((StateObject)new ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl(this.materialized, prevDeltaCopy, this.getKeyGroupRange()));
        }
    }

    @Nonnull
    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
        ChangelogKeyGroupedPriorityQueue<Object> queue = this.priorityQueueStatesByName.get(stateName);
        if (queue == null) {
            PriorityQueueStateChangeLoggerImpl priorityQueueStateChangeLogger = new PriorityQueueStateChangeLoggerImpl(byteOrderedElementSerializer, this.keyedStateBackend.getKeyContext(), this.stateChangelogWriter, new RegisteredPriorityQueueStateBackendMetaInfo(stateName, byteOrderedElementSerializer));
            queue = new ChangelogKeyGroupedPriorityQueue<T>(this.keyedStateBackend.create(stateName, byteOrderedElementSerializer), priorityQueueStateChangeLogger, byteOrderedElementSerializer);
            this.priorityQueueStatesByName.put(stateName, queue);
        }
        return queue;
    }

    @VisibleForTesting
    public int numKeyValueStateEntries() {
        return this.keyedStateBackend.numKeyValueStateEntries();
    }

    public boolean isStateImmutableInStateBackend(CheckpointType checkpointOptions) {
        return this.keyedStateBackend.isStateImmutableInStateBackend(checkpointOptions);
    }

    @Nonnull
    public SavepointResources<K> savepoint() throws Exception {
        return this.keyedStateBackend.savepoint();
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        if (this.lastCheckpointId == checkpointId) {
            this.stateChangelogWriter.confirm(this.lastUploadedFrom, this.lastUploadedTo);
        }
        this.keyedStateBackend.notifyCheckpointComplete(checkpointId);
    }

    public void notifyCheckpointAborted(long checkpointId) throws Exception {
        if (this.lastCheckpointId == checkpointId) {
            this.stateChangelogWriter.reset(this.lastUploadedFrom, this.lastUploadedTo);
        }
        this.keyedStateBackend.notifyCheckpointAborted(checkpointId);
    }

    public <N, S extends State, T> S getOrCreateKeyedState(TypeSerializer<N> namespaceSerializer, StateDescriptor<S, T> stateDescriptor) throws Exception {
        Preconditions.checkNotNull(namespaceSerializer, (String)"Namespace serializer");
        Preconditions.checkNotNull(this.getKeySerializer(), (String)"State key serializer has not been configured in the config. This operation cannot use partitioned state.");
        InternalKvState kvState = this.keyValueStatesByName.get(stateDescriptor.getName());
        if (kvState == null) {
            if (!stateDescriptor.isSerializerInitialized()) {
                stateDescriptor.initializeSerializerUnlessSet(this.executionConfig);
            }
            kvState = LatencyTrackingStateFactory.createStateAndWrapWithLatencyTrackingIfEnabled((InternalKvState)((InternalKvState)TtlStateFactory.createStateAndWrapWithTtlIfEnabled(namespaceSerializer, stateDescriptor, (KeyedStateBackend)this, (TtlTimeProvider)this.ttlTimeProvider)), stateDescriptor, (LatencyTrackingStateConfig)this.keyedStateBackend.getLatencyTrackingStateConfig());
            this.keyValueStatesByName.put(stateDescriptor.getName(), kvState);
            this.keyedStateBackend.publishQueryableStateIfEnabled(stateDescriptor, kvState);
        }
        this.functionDelegationHelper.addOrUpdate(stateDescriptor);
        return (S)kvState;
    }

    @Nonnull
    public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(@Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<S, SV> stateDesc, @Nonnull StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception {
        StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getType());
        if (stateFactory == null) {
            String message = String.format("State %s is not supported by %s", stateDesc.getClass(), this.getClass());
            throw new FlinkRuntimeException(message);
        }
        RegisteredKeyValueStateBackendMetaInfo meta = new RegisteredKeyValueStateBackendMetaInfo(stateDesc.getType(), stateDesc.getName(), namespaceSerializer, stateDesc.getSerializer(), snapshotTransformFactory);
        InternalKvState state = (InternalKvState)this.keyedStateBackend.createInternalState(namespaceSerializer, stateDesc, snapshotTransformFactory);
        KvStateChangeLoggerImpl kvStateChangeLogger = new KvStateChangeLoggerImpl(state.getKeySerializer(), state.getNamespaceSerializer(), state.getValueSerializer(), this.keyedStateBackend.getKeyContext(), this.stateChangelogWriter, (RegisteredStateMetaInfoBase)meta, stateDesc.getTtlConfig(), stateDesc.getDefaultValue());
        Object is = stateFactory.create(state, kvStateChangeLogger, this.keyedStateBackend);
        this.changelogStates.put(stateDesc.getName(), (ChangelogState)is);
        return is;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void completeRestore(Collection<ChangelogStateBackendHandle> stateHandles) {
        if (!stateHandles.isEmpty()) {
            List<KeyedStateHandle> list = this.materialized;
            synchronized (list) {
                for (ChangelogStateBackendHandle h : stateHandles) {
                    if (h == null) continue;
                    this.materialized.addAll(h.getMaterializedStateHandles());
                    this.restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles());
                }
            }
        }
        this.changelogStates.clear();
    }

    public KeyedStateBackend<K> getDelegatedKeyedStateBackend(boolean recursive) {
        return this.keyedStateBackend.getDelegatedKeyedStateBackend(recursive);
    }

    public ChangelogState getExistingStateForRecovery(String name, StateMetaInfoSnapshot.BackendStateType type) throws NoSuchElementException, UnsupportedOperationException {
        ChangelogState state;
        switch (type) {
            case KEY_VALUE: {
                state = this.changelogStates.get(name);
                break;
            }
            case PRIORITY_QUEUE: {
                state = this.priorityQueueStatesByName.get(name);
                break;
            }
            default: {
                throw new UnsupportedOperationException(String.format("Unknown state type %s (%s)", type, name));
            }
        }
        if (state == null) {
            throw new NoSuchElementException(String.format("%s state %s not found", type, name));
        }
        return state;
    }

    private static <T> RunnableFuture<T> toRunnableFuture(final CompletableFuture<T> f) {
        return new RunnableFuture<T>(){

            @Override
            public void run() {
                f.join();
            }

            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                return f.cancel(mayInterruptIfRunning);
            }

            @Override
            public boolean isCancelled() {
                return f.isCancelled();
            }

            @Override
            public boolean isDone() {
                return f.isDone();
            }

            @Override
            public T get() throws InterruptedException, ExecutionException {
                return f.get();
            }

            @Override
            public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                return f.get(timeout, unit);
            }
        };
    }

    private static interface StateFactory {
        public <K, N, SV, S extends State, IS extends S> IS create(InternalKvState<K, N, SV> var1, KvStateChangeLogger<SV, N> var2, InternalKeyContext<K> var3) throws Exception;
    }
}

