/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.highavailability;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper;
import org.apache.flink.runtime.persistence.StateHandleStore;
import org.apache.flink.runtime.persistence.StringResourceVersion;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.util.StateHandleStoreUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KubernetesStateHandleStore<T extends Serializable>
implements StateHandleStore<T, StringResourceVersion> {
    private static final Logger LOG = LoggerFactory.getLogger(KubernetesStateHandleStore.class);
    private final FlinkKubeClient kubeClient;
    private final String configMapName;
    private final RetrievableStateStorageHelper<T> storage;
    private final Predicate<String> configMapKeyFilter;
    @Nullable
    private final String lockIdentity;

    private static <T extends Serializable> StateHandleWithDeleteMarker<T> deserializeStateHandle(String content) throws IOException {
        Preconditions.checkNotNull((Object)content, (String)"Content should not be null.");
        byte[] data = Base64.getDecoder().decode(content);
        try {
            return (StateHandleWithDeleteMarker)((Object)StateHandleStoreUtils.deserialize((byte[])data));
        }
        catch (IOException | ClassNotFoundException e) {
            throw new IOException(String.format("Failed to deserialize state handle from ConfigMap data %s.", content), e);
        }
    }

    private static String toBase64(byte[] bytes) {
        return Base64.getEncoder().encodeToString(bytes);
    }

    @VisibleForTesting
    static String serializeStateHandle(StateHandleWithDeleteMarker<?> stateHandle) throws IOException {
        return KubernetesStateHandleStore.toBase64(InstantiationUtil.serializeObject(stateHandle));
    }

    public KubernetesStateHandleStore(FlinkKubeClient kubeClient, String configMapName, RetrievableStateStorageHelper<T> storage, Predicate<String> configMapKeyFilter, @Nullable String lockIdentity) {
        this.kubeClient = (FlinkKubeClient)Preconditions.checkNotNull((Object)kubeClient, (String)"Kubernetes client");
        this.storage = (RetrievableStateStorageHelper)Preconditions.checkNotNull(storage, (String)"State storage");
        this.configMapName = (String)Preconditions.checkNotNull((Object)configMapName, (String)"ConfigMap name");
        this.configMapKeyFilter = (Predicate)Preconditions.checkNotNull(configMapKeyFilter);
        this.lockIdentity = lockIdentity;
    }

    public RetrievableStateHandle<T> addAndLock(String key, T state) throws PossibleInconsistentStateException, Exception {
        Preconditions.checkNotNull((Object)key, (String)"Key in ConfigMap.");
        Preconditions.checkNotNull(state, (String)"State.");
        RetrievableStateHandle storeHandle = this.storage.store(state);
        byte[] serializedStoreHandle = StateHandleStoreUtils.serializeOrDiscard(new StateHandleWithDeleteMarker(storeHandle));
        boolean discardState = true;
        try {
            discardState = this.updateConfigMap(cm -> {
                try {
                    return this.addEntry((KubernetesConfigMap)cm, key, serializedStoreHandle);
                }
                catch (Exception e) {
                    throw new CompletionException(e);
                }
            }).get() == false;
            RetrievableStateHandle retrievableStateHandle = storeHandle;
            return retrievableStateHandle;
        }
        catch (Exception ex) {
            Optional possibleInconsistentStateException = ExceptionUtils.findThrowable((Throwable)ex, PossibleInconsistentStateException.class);
            if (possibleInconsistentStateException.isPresent()) {
                discardState = false;
                throw (PossibleInconsistentStateException)possibleInconsistentStateException.get();
            }
            throw (StateHandleStore.AlreadyExistException)ExceptionUtils.findThrowable((Throwable)ex, StateHandleStore.AlreadyExistException.class).orElseThrow(() -> ex);
        }
        finally {
            if (discardState) {
                storeHandle.discardState();
            }
        }
    }

    public void replace(String key, StringResourceVersion resourceVersion, T state) throws Exception {
        Preconditions.checkNotNull((Object)key, (String)"Key in ConfigMap.");
        Preconditions.checkNotNull(state, (String)"State.");
        RetrievableStateHandle newStateHandle = this.storage.store(state);
        byte[] serializedStateHandle = StateHandleStoreUtils.serializeOrDiscard(new StateHandleWithDeleteMarker(newStateHandle));
        boolean discardOldState = false;
        boolean discardNewState = true;
        AtomicReference oldStateHandleRef = new AtomicReference();
        try {
            boolean success;
            discardOldState = success = this.updateConfigMap(cm -> {
                try {
                    return this.replaceEntry((KubernetesConfigMap)cm, key, serializedStateHandle, oldStateHandleRef);
                }
                catch (StateHandleStore.NotExistException e) {
                    throw new CompletionException(e);
                }
            }).get().booleanValue();
            discardNewState = !success;
        }
        catch (Exception ex) {
            Optional possibleInconsistentStateException = ExceptionUtils.findThrowable((Throwable)ex, PossibleInconsistentStateException.class);
            if (possibleInconsistentStateException.isPresent()) {
                discardNewState = false;
                throw (PossibleInconsistentStateException)possibleInconsistentStateException.get();
            }
            throw (StateHandleStore.NotExistException)((Object)ExceptionUtils.findThrowable((Throwable)ex, StateHandleStore.NotExistException.class).orElseThrow(() -> ex));
        }
        finally {
            if (discardNewState) {
                newStateHandle.discardState();
            }
            if (discardOldState) {
                ((RetrievableStateHandle)Objects.requireNonNull(oldStateHandleRef.get(), "state handle should have been set on success")).discardState();
            }
        }
    }

    public StringResourceVersion exists(String key) throws Exception {
        Preconditions.checkNotNull((Object)key, (String)"Key in ConfigMap.");
        return this.kubeClient.getConfigMap(this.configMapName).map(configMap -> {
            String content = configMap.getData().get(key);
            if (content != null) {
                try {
                    StateHandleWithDeleteMarker stateHandle = KubernetesStateHandleStore.deserializeStateHandle(content);
                    if (stateHandle.isMarkedForDeletion()) {
                        return StringResourceVersion.notExisting();
                    }
                }
                catch (IOException e) {
                    return StringResourceVersion.notExisting();
                }
                return StringResourceVersion.valueOf((String)configMap.getResourceVersion());
            }
            return StringResourceVersion.notExisting();
        }).orElseThrow(this::getConfigMapNotExistException);
    }

    public RetrievableStateHandle<T> getAndLock(String key) throws Exception {
        Preconditions.checkNotNull((Object)key, (String)"Key in ConfigMap.");
        Optional<KubernetesConfigMap> optional = this.kubeClient.getConfigMap(this.configMapName);
        if (optional.isPresent()) {
            KubernetesConfigMap configMap = optional.get();
            if (configMap.getData().containsKey(key)) {
                StateHandleWithDeleteMarker<T> result = KubernetesStateHandleStore.deserializeStateHandle(configMap.getData().get(key));
                if (result.isMarkedForDeletion()) {
                    throw this.getKeyMarkedAsDeletedException(key);
                }
                return result.getInner();
            }
            throw this.getKeyNotExistException(key);
        }
        throw this.getConfigMapNotExistException();
    }

    public List<Tuple2<RetrievableStateHandle<T>, String>> getAllAndLock() {
        return this.kubeClient.getConfigMap(this.configMapName).map(configMap -> {
            ArrayList stateHandles = new ArrayList();
            configMap.getData().entrySet().stream().filter(entry -> this.configMapKeyFilter.test((String)entry.getKey())).forEach(entry -> {
                try {
                    StateHandleWithDeleteMarker<T> result = KubernetesStateHandleStore.deserializeStateHandle((String)entry.getValue());
                    if (!result.isMarkedForDeletion()) {
                        stateHandles.add(new Tuple2(result.getInner(), entry.getKey()));
                    }
                }
                catch (IOException e) {
                    LOG.warn("ConfigMap {} contained corrupted data. Ignoring the key {}.", (Object)this.configMapName, entry.getKey());
                }
            });
            return stateHandles;
        }).orElse(Collections.emptyList());
    }

    public Collection<String> getAllHandles() throws Exception {
        return this.kubeClient.getConfigMap(this.configMapName).map(configMap -> configMap.getData().keySet().stream().filter(this.configMapKeyFilter).filter(k -> {
            try {
                String content = Objects.requireNonNull(configMap.getData().get(k));
                return !KubernetesStateHandleStore.deserializeStateHandle(content).isMarkedForDeletion();
            }
            catch (IOException e) {
                return false;
            }
        }).collect(Collectors.toList())).orElseThrow(this::getConfigMapNotExistException);
    }

    public boolean releaseAndTryRemove(String key) throws Exception {
        Preconditions.checkNotNull((Object)key, (String)"Key in ConfigMap.");
        AtomicReference stateHandleRefer = new AtomicReference();
        AtomicBoolean stateHandleDoesNotExist = new AtomicBoolean(false);
        return (Boolean)((CompletableFuture)this.updateConfigMap(configMap -> {
            String content = configMap.getData().get(key);
            if (content != null) {
                try {
                    StateHandleWithDeleteMarker<T> result = KubernetesStateHandleStore.deserializeStateHandle(content);
                    if (!result.isMarkedForDeletion()) {
                        configMap.getData().put(key, KubernetesStateHandleStore.serializeStateHandle(result.toDeleting()));
                    }
                    stateHandleRefer.set(result.getInner());
                }
                catch (IOException e) {
                    KubernetesStateHandleStore.logInvalidEntry(key, this.configMapName, e);
                    Objects.requireNonNull(configMap.getData().remove(key));
                }
                return Optional.of(configMap);
            }
            stateHandleDoesNotExist.set(true);
            return Optional.empty();
        }).thenCompose(updated -> {
            if (updated.booleanValue() && stateHandleRefer.get() != null) {
                try {
                    ((RetrievableStateHandle)stateHandleRefer.get()).discardState();
                    return this.updateConfigMap(configMap -> {
                        configMap.getData().remove(key);
                        return Optional.of(configMap);
                    });
                }
                catch (Exception e) {
                    throw new CompletionException(e);
                }
            }
            return CompletableFuture.completedFuture(stateHandleDoesNotExist.get() || updated != false);
        })).get();
    }

    public void clearEntries() throws Exception {
        this.updateConfigMap(configMap -> {
            configMap.getData().keySet().removeIf(this.configMapKeyFilter);
            return Optional.of(configMap);
        }).get();
    }

    public void release(String name) {
    }

    public void releaseAll() {
    }

    public String toString() {
        return this.getClass().getSimpleName() + "{configMapName='" + this.configMapName + "'}";
    }

    private boolean isValidOperation(KubernetesConfigMap c) {
        return this.lockIdentity == null || KubernetesLeaderElector.hasLeadership(c, this.lockIdentity);
    }

    @VisibleForTesting
    CompletableFuture<Boolean> updateConfigMap(Function<KubernetesConfigMap, Optional<KubernetesConfigMap>> updateFn) {
        return this.kubeClient.checkAndUpdateConfigMap(this.configMapName, configMap -> {
            if (this.isValidOperation((KubernetesConfigMap)configMap)) {
                return (Optional)updateFn.apply((KubernetesConfigMap)configMap);
            }
            return Optional.empty();
        });
    }

    private Optional<KubernetesConfigMap> addEntry(KubernetesConfigMap configMap, String key, byte[] serializedStateHandle) throws Exception {
        String newBase64Content;
        block6: {
            String oldBase64Content = configMap.getData().get(key);
            newBase64Content = KubernetesStateHandleStore.toBase64(serializedStateHandle);
            if (oldBase64Content != null) {
                try {
                    StateHandleWithDeleteMarker<T> stateHandle = KubernetesStateHandleStore.deserializeStateHandle(oldBase64Content);
                    if (stateHandle.isMarkedForDeletion()) {
                        if (!this.releaseAndTryRemove(key)) {
                            throw new IllegalStateException("Unable to remove the marked as deleting entry.");
                        }
                        break block6;
                    }
                    if (oldBase64Content.equals(newBase64Content)) {
                        return Optional.of(configMap);
                    }
                    throw this.getKeyAlreadyExistException(key);
                }
                catch (IOException e) {
                    KubernetesStateHandleStore.logInvalidEntry(key, this.configMapName, e);
                }
            }
        }
        configMap.getData().put(key, newBase64Content);
        return Optional.of(configMap);
    }

    private Optional<KubernetesConfigMap> replaceEntry(KubernetesConfigMap configMap, String key, byte[] serializedStateHandle, AtomicReference<RetrievableStateHandle<T>> oldStateHandleRef) throws StateHandleStore.NotExistException {
        String content = configMap.getData().get(key);
        if (content != null) {
            block5: {
                try {
                    StateHandleWithDeleteMarker<T> stateHandle = KubernetesStateHandleStore.deserializeStateHandle(content);
                    oldStateHandleRef.set(stateHandle.getInner());
                    if (!stateHandle.isMarkedForDeletion()) break block5;
                    StateHandleStore.NotExistException exception = this.getKeyNotExistException(key);
                    try {
                        this.releaseAndTryRemove(key);
                    }
                    catch (Exception e) {
                        exception.addSuppressed((Throwable)e);
                    }
                    throw exception;
                }
                catch (IOException e) {
                    KubernetesStateHandleStore.logInvalidEntry(key, this.configMapName, e);
                }
            }
            configMap.getData().put(key, KubernetesStateHandleStore.toBase64(serializedStateHandle));
            return Optional.of(configMap);
        }
        throw this.getKeyNotExistException(key);
    }

    private KubernetesException getConfigMapNotExistException() {
        return new KubernetesException("ConfigMap " + this.configMapName + " does not exists. It may be deleted externally.");
    }

    private StateHandleStore.NotExistException getKeyNotExistException(String key) {
        return new StateHandleStore.NotExistException("Could not find " + key + " in ConfigMap " + this.configMapName);
    }

    private StateHandleStore.NotExistException getKeyMarkedAsDeletedException(String key) {
        return new StateHandleStore.NotExistException("Already marked for deletion " + key + " in ConfigMap " + this.configMapName);
    }

    private StateHandleStore.AlreadyExistException getKeyAlreadyExistException(String key) {
        return new StateHandleStore.AlreadyExistException(key + " already exists in ConfigMap " + this.configMapName);
    }

    private static void logInvalidEntry(String key, String configMapName, Throwable e) {
        LOG.warn("Could not retrieve the state handle of '{}' from ConfigMap '{}'. Removing the entry as we don't have any way to recover.", new Object[]{key, configMapName, e});
    }

    @VisibleForTesting
    static class StateHandleWithDeleteMarker<T extends Serializable>
    implements StateObject {
        private final RetrievableStateHandle<T> inner;
        private final boolean markedForDeletion;

        StateHandleWithDeleteMarker(RetrievableStateHandle<T> inner) {
            this(inner, false);
        }

        private StateHandleWithDeleteMarker(RetrievableStateHandle<T> inner, boolean markedForDeletion) {
            this.inner = inner;
            this.markedForDeletion = markedForDeletion;
        }

        public void discardState() throws Exception {
            this.inner.discardState();
        }

        public long getStateSize() {
            return this.inner.getStateSize();
        }

        public void collectSizeStats(StateObject.StateObjectSizeStatsCollector collector) {
            this.inner.collectSizeStats(collector);
        }

        RetrievableStateHandle<T> getInner() {
            return this.inner;
        }

        boolean isMarkedForDeletion() {
            return this.markedForDeletion;
        }

        StateHandleWithDeleteMarker<T> toDeleting() {
            return new StateHandleWithDeleteMarker<T>(this.inner, true);
        }
    }
}

