/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.autoscaler.state;

import com.fasterxml.jackson.core.JacksonException;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.io.IOUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.autoscaler.ScalingSummary;
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
import org.apache.flink.autoscaler.utils.AutoScalerSerDeModule;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
import org.apache.flink.kubernetes.operator.autoscaler.state.ConfigMapStore;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.LoaderOptions;

public class KubernetesAutoScalerStateStore
implements AutoScalerStateStore<ResourceID, KubernetesJobAutoScalerContext> {
    private static final Logger LOG = LoggerFactory.getLogger(KubernetesAutoScalerStateStore.class);
    @VisibleForTesting
    protected static final String SCALING_HISTORY_KEY = "scalingHistory";
    @VisibleForTesting
    protected static final String COLLECTED_METRICS_KEY = "collectedMetrics";
    @VisibleForTesting
    protected static final String PARALLELISM_OVERRIDES_KEY = "parallelismOverrides";
    @VisibleForTesting
    protected static final int MAX_CM_BYTES = 1000000;
    protected static final ObjectMapper YAML_MAPPER = new ObjectMapper((JsonFactory)KubernetesAutoScalerStateStore.yamlFactory()).registerModule((Module)new JavaTimeModule()).registerModule((Module)new AutoScalerSerDeModule());
    private final ConfigMapStore configMapStore;

    public KubernetesAutoScalerStateStore(ConfigMapStore configMapStore) {
        this.configMapStore = configMapStore;
    }

    public void storeScalingHistory(KubernetesJobAutoScalerContext jobContext, Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory) {
        this.configMapStore.putSerializedState(jobContext, SCALING_HISTORY_KEY, KubernetesAutoScalerStateStore.serializeScalingHistory(scalingHistory));
    }

    public Map<JobVertexID, SortedMap<Instant, ScalingSummary>> getScalingHistory(KubernetesJobAutoScalerContext jobContext) {
        Optional<String> serializedScalingHistory = this.configMapStore.getSerializedState(jobContext, SCALING_HISTORY_KEY);
        if (serializedScalingHistory.isEmpty()) {
            return new HashMap<JobVertexID, SortedMap<Instant, ScalingSummary>>();
        }
        try {
            return KubernetesAutoScalerStateStore.deserializeScalingHistory(serializedScalingHistory.get());
        }
        catch (JacksonException e) {
            LOG.error("Could not deserialize scaling history, possibly the format changed. Discarding...", (Throwable)e);
            this.configMapStore.removeSerializedState(jobContext, SCALING_HISTORY_KEY);
            return new HashMap<JobVertexID, SortedMap<Instant, ScalingSummary>>();
        }
    }

    public void removeScalingHistory(KubernetesJobAutoScalerContext jobContext) {
        this.configMapStore.removeSerializedState(jobContext, SCALING_HISTORY_KEY);
    }

    public void storeCollectedMetrics(KubernetesJobAutoScalerContext jobContext, SortedMap<Instant, CollectedMetrics> metrics) {
        this.configMapStore.putSerializedState(jobContext, COLLECTED_METRICS_KEY, KubernetesAutoScalerStateStore.serializeEvaluatedMetrics(metrics));
    }

    public SortedMap<Instant, CollectedMetrics> getCollectedMetrics(KubernetesJobAutoScalerContext jobContext) {
        Optional<String> serializedEvaluatedMetricsOpt = this.configMapStore.getSerializedState(jobContext, COLLECTED_METRICS_KEY);
        if (serializedEvaluatedMetricsOpt.isEmpty()) {
            return new TreeMap<Instant, CollectedMetrics>();
        }
        try {
            return KubernetesAutoScalerStateStore.deserializeEvaluatedMetrics(serializedEvaluatedMetricsOpt.get());
        }
        catch (JacksonException e) {
            LOG.error("Could not deserialize metric history, possibly the format changed. Discarding...", (Throwable)e);
            this.configMapStore.removeSerializedState(jobContext, COLLECTED_METRICS_KEY);
            return new TreeMap<Instant, CollectedMetrics>();
        }
    }

    public void removeCollectedMetrics(KubernetesJobAutoScalerContext jobContext) {
        this.configMapStore.removeSerializedState(jobContext, COLLECTED_METRICS_KEY);
    }

    public void storeParallelismOverrides(KubernetesJobAutoScalerContext jobContext, Map<String, String> parallelismOverrides) {
        this.configMapStore.putSerializedState(jobContext, PARALLELISM_OVERRIDES_KEY, KubernetesAutoScalerStateStore.serializeParallelismOverrides(parallelismOverrides));
    }

    public Map<String, String> getParallelismOverrides(KubernetesJobAutoScalerContext jobContext) {
        return this.configMapStore.getSerializedState(jobContext, PARALLELISM_OVERRIDES_KEY).map(KubernetesAutoScalerStateStore::deserializeParallelismOverrides).orElse(new HashMap());
    }

    public void removeParallelismOverrides(KubernetesJobAutoScalerContext jobContext) {
        this.configMapStore.removeSerializedState(jobContext, PARALLELISM_OVERRIDES_KEY);
    }

    public void clearAll(KubernetesJobAutoScalerContext jobContext) {
        this.configMapStore.clearAll(jobContext);
    }

    public void flush(KubernetesJobAutoScalerContext jobContext) {
        this.trimHistoryToMaxCmSize(jobContext);
        this.configMapStore.flush(jobContext);
    }

    public void removeInfoFromCache(ResourceID resourceID) {
        this.configMapStore.removeInfoFromCache(resourceID);
    }

    protected static String serializeScalingHistory(Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory) {
        return KubernetesAutoScalerStateStore.compress(YAML_MAPPER.writeValueAsString(scalingHistory));
    }

    private static Map<JobVertexID, SortedMap<Instant, ScalingSummary>> deserializeScalingHistory(String scalingHistory) throws JacksonException {
        return (Map)YAML_MAPPER.readValue(KubernetesAutoScalerStateStore.decompress(scalingHistory), (TypeReference)new TypeReference<Map<JobVertexID, SortedMap<Instant, ScalingSummary>>>(){});
    }

    @VisibleForTesting
    protected static String serializeEvaluatedMetrics(SortedMap<Instant, CollectedMetrics> evaluatedMetrics) {
        return KubernetesAutoScalerStateStore.compress(YAML_MAPPER.writeValueAsString(evaluatedMetrics));
    }

    private static SortedMap<Instant, CollectedMetrics> deserializeEvaluatedMetrics(String evaluatedMetrics) throws JacksonException {
        return (SortedMap)YAML_MAPPER.readValue(KubernetesAutoScalerStateStore.decompress(evaluatedMetrics), (TypeReference)new TypeReference<SortedMap<Instant, CollectedMetrics>>(){});
    }

    private static String serializeParallelismOverrides(Map<String, String> overrides) {
        return (String)ConfigurationUtils.convertValue(overrides, String.class);
    }

    private static Map<String, String> deserializeParallelismOverrides(String overrides) {
        return (Map)ConfigurationUtils.convertValue((Object)overrides, Map.class);
    }

    @VisibleForTesting
    protected void trimHistoryToMaxCmSize(KubernetesJobAutoScalerContext context) {
        int scalingHistorySize = this.configMapStore.getSerializedState(context, SCALING_HISTORY_KEY).map(String::length).orElse(0);
        int metricHistorySize = this.configMapStore.getSerializedState(context, COLLECTED_METRICS_KEY).map(String::length).orElse(0);
        SortedMap<Instant, CollectedMetrics> metricHistory = this.getCollectedMetrics(context);
        while (scalingHistorySize + metricHistorySize > 1000000) {
            if (metricHistory.isEmpty()) {
                return;
            }
            Instant firstKey = metricHistory.firstKey();
            LOG.info("Trimming metric history by removing {}", (Object)firstKey);
            metricHistory.remove(firstKey);
            String compressed = KubernetesAutoScalerStateStore.serializeEvaluatedMetrics(metricHistory);
            this.configMapStore.putSerializedState(context, COLLECTED_METRICS_KEY, compressed);
            metricHistorySize = compressed.length();
        }
    }

    private static String compress(String original) throws IOException {
        ByteArrayOutputStream rstBao = new ByteArrayOutputStream();
        try (GZIPOutputStream zos = new GZIPOutputStream(rstBao);){
            zos.write(original.getBytes(StandardCharsets.UTF_8));
        }
        return Base64.getEncoder().encodeToString(rstBao.toByteArray());
    }

    private static String decompress(String compressed) {
        String string;
        if (compressed == null) {
            return null;
        }
        byte[] bytes = Base64.getDecoder().decode(compressed);
        GZIPInputStream zi = new GZIPInputStream(new ByteArrayInputStream(bytes));
        try {
            string = IOUtils.toString((InputStream)zi, (Charset)StandardCharsets.UTF_8);
        }
        catch (Throwable throwable) {
            try {
                try {
                    zi.close();
                }
                catch (Throwable throwable2) {
                    throwable.addSuppressed(throwable2);
                }
                throw throwable;
            }
            catch (Exception e) {
                LOG.warn("Error while decompressing scaling data, treating as uncompressed");
                return compressed;
            }
        }
        zi.close();
        return string;
    }

    private static YAMLFactory yamlFactory() {
        LoaderOptions loaderOptions = new LoaderOptions();
        loaderOptions.setCodePointLimit(0x1400000);
        return YAMLFactory.builder().loaderOptions(loaderOptions).build();
    }
}

