/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.autoscaler;

import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.metrics.CollectedMetricHistory;
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.MetricAggregator;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.topology.VertexInfo;
import org.apache.flink.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ScalingMetricEvaluator {
    private static final Logger LOG = LoggerFactory.getLogger(ScalingMetricEvaluator.class);

    public EvaluatedMetrics evaluate(Configuration conf, CollectedMetricHistory collectedMetrics, Duration restartTime) {
        LOG.debug("Restart time used in metrics evaluation: {}", (Object)restartTime);
        HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> scalingOutput = new HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>();
        SortedMap<Instant, CollectedMetrics> metricsHistory = collectedMetrics.getMetricHistory();
        JobTopology topology = collectedMetrics.getJobTopology();
        boolean processingBacklog = ScalingMetricEvaluator.isProcessingBacklog(topology, metricsHistory, conf);
        for (JobVertexID vertex : topology.getVerticesInTopologicalOrder()) {
            scalingOutput.put(vertex, this.evaluateMetrics(conf, scalingOutput, metricsHistory, topology, vertex, processingBacklog, restartTime));
        }
        Map<ScalingMetric, EvaluatedScalingMetric> globalMetrics = ScalingMetricEvaluator.evaluateGlobalMetrics(metricsHistory);
        return new EvaluatedMetrics(scalingOutput, globalMetrics);
    }

    @VisibleForTesting
    protected static boolean isProcessingBacklog(JobTopology topology, SortedMap<Instant, CollectedMetrics> metricsHistory, Configuration conf) {
        Map<JobVertexID, Map<ScalingMetric, Double>> lastMetrics = ((CollectedMetrics)metricsHistory.get(metricsHistory.lastKey())).getVertexMetrics();
        return topology.getVerticesInTopologicalOrder().stream().filter(topology::isSource).anyMatch(vertex -> {
            double lag = ((Map)lastMetrics.get(vertex)).getOrDefault((Object)ScalingMetric.LAG, 0.0);
            double inputRateAvg = ScalingMetricEvaluator.getRate(ScalingMetric.NUM_RECORDS_IN, vertex, metricsHistory);
            if (Double.isNaN(inputRateAvg)) {
                return false;
            }
            double lagSeconds = lag / inputRateAvg;
            if (lagSeconds > (double)((Duration)conf.get(AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD)).toSeconds()) {
                LOG.info("Currently processing backlog at source {}", vertex);
                return true;
            }
            return false;
        });
    }

    @Nonnull
    private Map<ScalingMetric, EvaluatedScalingMetric> evaluateMetrics(Configuration conf, HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> scalingOutput, SortedMap<Instant, CollectedMetrics> metricsHistory, JobTopology topology, JobVertexID vertex, boolean processingBacklog, Duration restartTime) {
        Map<ScalingMetric, Double> latestVertexMetrics = ((CollectedMetrics)metricsHistory.get(metricsHistory.lastKey())).getVertexMetrics().get(vertex);
        VertexInfo vertexInfo = topology.get(vertex);
        double inputRateAvg = ScalingMetricEvaluator.getRate(ScalingMetric.NUM_RECORDS_IN, vertex, metricsHistory);
        HashMap<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
        this.computeTargetDataRate(topology, vertex, conf, inputRateAvg, scalingOutput, metricsHistory, latestVertexMetrics, evaluatedMetrics);
        double busyTimeAvg = ScalingMetricEvaluator.computeBusyTimeAvg(conf, metricsHistory, vertex, vertexInfo.getParallelism());
        evaluatedMetrics.put(ScalingMetric.TRUE_PROCESSING_RATE, EvaluatedScalingMetric.avg(ScalingMetricEvaluator.computeTrueProcessingRate(busyTimeAvg, inputRateAvg, metricsHistory, vertex, conf)));
        evaluatedMetrics.put(ScalingMetric.LOAD, EvaluatedScalingMetric.avg(busyTimeAvg / 1000.0));
        Optional.ofNullable(latestVertexMetrics.get((Object)ScalingMetric.LAG)).ifPresent(l -> evaluatedMetrics.put(ScalingMetric.LAG, EvaluatedScalingMetric.of(l)));
        evaluatedMetrics.put(ScalingMetric.PARALLELISM, EvaluatedScalingMetric.of(vertexInfo.getParallelism()));
        evaluatedMetrics.put(ScalingMetric.MAX_PARALLELISM, EvaluatedScalingMetric.of(vertexInfo.getMaxParallelism()));
        evaluatedMetrics.put(ScalingMetric.NUM_SOURCE_PARTITIONS, EvaluatedScalingMetric.of(vertexInfo.getNumSourcePartitions()));
        ScalingMetricEvaluator.computeProcessingRateThresholds(evaluatedMetrics, conf, processingBacklog, restartTime);
        return evaluatedMetrics;
    }

    @VisibleForTesting
    protected static double computeBusyTimeAvg(Configuration conf, SortedMap<Instant, CollectedMetrics> metricsHistory, JobVertexID vertex, int parallelism) {
        if (conf.get(AutoScalerOptions.BUSY_TIME_AGGREGATOR) == MetricAggregator.AVG) {
            return ScalingMetricEvaluator.getRate(ScalingMetric.ACCUMULATED_BUSY_TIME, vertex, metricsHistory) / (double)parallelism;
        }
        return ScalingMetricEvaluator.getAverage(ScalingMetric.LOAD, vertex, metricsHistory) * 1000.0;
    }

    protected static double computeTrueProcessingRate(double busyTimeAvg, double inputRateAvg, SortedMap<Instant, CollectedMetrics> metricsHistory, JobVertexID vertex, Configuration conf) {
        double observedTprAvg;
        double busyTimeTpr = ScalingMetricEvaluator.computeTprFromBusyTime(busyTimeAvg, inputRateAvg);
        ScalingMetric tprMetric = ScalingMetricEvaluator.selectTprMetric(vertex, conf, busyTimeTpr, observedTprAvg = ScalingMetricEvaluator.getAverage(ScalingMetric.OBSERVED_TPR, vertex, metricsHistory, (Integer)conf.get(AutoScalerOptions.OBSERVED_TRUE_PROCESSING_RATE_MIN_OBSERVATIONS)));
        return tprMetric == ScalingMetric.OBSERVED_TPR ? observedTprAvg : busyTimeTpr;
    }

    private static double computeTprFromBusyTime(double busyMsPerSecond, double rate) {
        if (rate == 0.0) {
            return Double.POSITIVE_INFINITY;
        }
        return rate / (busyMsPerSecond / 1000.0);
    }

    private static ScalingMetric selectTprMetric(JobVertexID jobVertexID, Configuration conf, double busyTimeTprAvg, double observedTprAvg) {
        if (Double.isNaN(observedTprAvg)) {
            return ScalingMetric.TRUE_PROCESSING_RATE;
        }
        if (Double.isInfinite(busyTimeTprAvg) || Double.isNaN(busyTimeTprAvg)) {
            return ScalingMetric.OBSERVED_TPR;
        }
        double switchThreshold = (Double)conf.get(AutoScalerOptions.OBSERVED_TRUE_PROCESSING_RATE_SWITCH_THRESHOLD);
        if (busyTimeTprAvg > observedTprAvg * (1.0 + switchThreshold)) {
            LOG.debug("Using observed tpr {} for {} as busy time based seems too large ({})", new Object[]{observedTprAvg, jobVertexID, busyTimeTprAvg});
            return ScalingMetric.OBSERVED_TPR;
        }
        LOG.debug("Using busy time based tpr {} for {}.", (Object)busyTimeTprAvg, (Object)jobVertexID);
        return ScalingMetric.TRUE_PROCESSING_RATE;
    }

    @VisibleForTesting
    protected static void computeProcessingRateThresholds(Map<ScalingMetric, EvaluatedScalingMetric> metrics, Configuration conf, boolean processingBacklog, Duration restartTime) {
        double lowerUtilization;
        double upperUtilization;
        double targetUtilization = (Double)conf.get(AutoScalerOptions.UTILIZATION_TARGET);
        double utilizationBoundary = (Double)conf.get(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY);
        if (processingBacklog) {
            upperUtilization = 1.0;
            lowerUtilization = 0.0;
        } else {
            upperUtilization = conf.getOptional(AutoScalerOptions.UTILIZATION_MAX).orElse(targetUtilization + utilizationBoundary);
            lowerUtilization = conf.getOptional(AutoScalerOptions.UTILIZATION_MIN).orElse(targetUtilization - utilizationBoundary);
        }
        double scaleUpThreshold = AutoScalerUtils.getTargetProcessingCapacity(metrics, conf, upperUtilization, false, restartTime);
        double scaleDownThreshold = AutoScalerUtils.getTargetProcessingCapacity(metrics, conf, lowerUtilization, true, restartTime);
        metrics.put(ScalingMetric.SCALE_UP_RATE_THRESHOLD, EvaluatedScalingMetric.of(scaleUpThreshold));
        metrics.put(ScalingMetric.SCALE_DOWN_RATE_THRESHOLD, EvaluatedScalingMetric.of(scaleDownThreshold));
    }

    private void computeTargetDataRate(JobTopology topology, JobVertexID vertex, Configuration conf, double inputRate, HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> alreadyEvaluated, SortedMap<Instant, CollectedMetrics> metricsHistory, Map<ScalingMetric, Double> latestVertexMetrics, Map<ScalingMetric, EvaluatedScalingMetric> out) {
        if (topology.isSource(vertex)) {
            double catchUpInputRate;
            double catchUpTargetSec = ((Duration)conf.get(AutoScalerOptions.CATCH_UP_DURATION)).toSeconds();
            double lagRate = ScalingMetricEvaluator.getRate(ScalingMetric.LAG, vertex, metricsHistory);
            double ingestionDataRate = Math.max(0.0, inputRate + lagRate);
            if (Double.isNaN(ingestionDataRate)) {
                throw new RuntimeException("Cannot evaluate metrics without ingestion rate information");
            }
            out.put(ScalingMetric.TARGET_DATA_RATE, EvaluatedScalingMetric.avg(ingestionDataRate));
            double lag = latestVertexMetrics.getOrDefault((Object)ScalingMetric.LAG, 0.0);
            double d = catchUpInputRate = catchUpTargetSec == 0.0 ? 0.0 : lag / catchUpTargetSec;
            if (catchUpInputRate > 0.0) {
                LOG.debug("Extra backlog processing input rate for {} is {}", (Object)vertex, (Object)catchUpInputRate);
            }
            out.put(ScalingMetric.CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(catchUpInputRate));
        } else {
            Set<JobVertexID> inputs = topology.get(vertex).getInputs().keySet();
            double sumAvgTargetRate = 0.0;
            double sumCatchUpDataRate = 0.0;
            for (JobVertexID inputVertex : inputs) {
                Map<ScalingMetric, EvaluatedScalingMetric> inputEvaluatedMetrics = alreadyEvaluated.get(inputVertex);
                EvaluatedScalingMetric inputTargetRate = inputEvaluatedMetrics.get((Object)ScalingMetric.TARGET_DATA_RATE);
                double outputRatio = ScalingMetricEvaluator.computeEdgeOutputRatio(inputVertex, vertex, topology, metricsHistory);
                LOG.debug("Computed output ratio for edge ({} -> {}) : {}", new Object[]{inputVertex, vertex, outputRatio});
                sumAvgTargetRate += inputTargetRate.getAverage() * outputRatio;
                sumCatchUpDataRate += inputEvaluatedMetrics.get((Object)ScalingMetric.CATCH_UP_DATA_RATE).getCurrent() * outputRatio;
            }
            out.put(ScalingMetric.TARGET_DATA_RATE, EvaluatedScalingMetric.avg(sumAvgTargetRate));
            out.put(ScalingMetric.CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(sumCatchUpDataRate));
        }
    }

    @VisibleForTesting
    protected static Map<ScalingMetric, EvaluatedScalingMetric> evaluateGlobalMetrics(SortedMap<Instant, CollectedMetrics> metricHistory) {
        Map<ScalingMetric, Double> latest = ((CollectedMetrics)metricHistory.get(metricHistory.lastKey())).getGlobalMetrics();
        HashMap<ScalingMetric, EvaluatedScalingMetric> out = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
        Double gcPressure = latest.getOrDefault((Object)ScalingMetric.GC_PRESSURE, Double.NaN);
        out.put(ScalingMetric.GC_PRESSURE, EvaluatedScalingMetric.of(gcPressure));
        ScalingMetricEvaluator.populateMetric(ScalingMetric.HEAP_MAX_USAGE_RATIO, metricHistory, out);
        ScalingMetricEvaluator.populateMetric(ScalingMetric.HEAP_MEMORY_USED, metricHistory, out);
        ScalingMetricEvaluator.populateMetric(ScalingMetric.MANAGED_MEMORY_USED, metricHistory, out);
        ScalingMetricEvaluator.populateMetric(ScalingMetric.METASPACE_MEMORY_USED, metricHistory, out);
        out.put(ScalingMetric.NUM_TASK_SLOTS_USED, EvaluatedScalingMetric.of(latest.getOrDefault((Object)ScalingMetric.NUM_TASK_SLOTS_USED, Double.NaN)));
        return out;
    }

    private static void populateMetric(ScalingMetric scalingMetric, SortedMap<Instant, CollectedMetrics> metricHistory, Map<ScalingMetric, EvaluatedScalingMetric> out) {
        Map<ScalingMetric, Double> latestMetrics = ((CollectedMetrics)metricHistory.get(metricHistory.lastKey())).getGlobalMetrics();
        Double latestObservation = latestMetrics.getOrDefault((Object)scalingMetric, Double.NaN);
        double value = ScalingMetricEvaluator.getAverageGlobalMetric(scalingMetric, metricHistory);
        out.put(scalingMetric, new EvaluatedScalingMetric(latestObservation, value));
    }

    private static double getAverageGlobalMetric(ScalingMetric metric, SortedMap<Instant, CollectedMetrics> metricsHistory) {
        return ScalingMetricEvaluator.getAverage(metric, null, metricsHistory);
    }

    public static double getAverage(ScalingMetric metric, @Nullable JobVertexID jobVertexId, SortedMap<Instant, CollectedMetrics> metricsHistory) {
        return ScalingMetricEvaluator.getAverage(metric, jobVertexId, metricsHistory, 1);
    }

    public static double getRate(ScalingMetric metric, @Nullable JobVertexID jobVertexId, SortedMap<Instant, CollectedMetrics> metricsHistory) {
        Instant firstTs = null;
        double first = Double.NaN;
        Instant lastTs = null;
        double last = Double.NaN;
        for (Map.Entry<Instant, CollectedMetrics> entry : metricsHistory.entrySet()) {
            double value = entry.getValue().getVertexMetrics().get(jobVertexId).getOrDefault((Object)metric, Double.NaN);
            if (Double.isNaN(value)) continue;
            if (Double.isNaN(first)) {
                first = value;
                firstTs = entry.getKey();
                continue;
            }
            last = value;
            lastTs = entry.getKey();
        }
        if (Double.isNaN(last)) {
            return Double.NaN;
        }
        return 1000.0 * (last - first) / (double)Duration.between(firstTs, lastTs).toMillis();
    }

    public static double getAverage(ScalingMetric metric, @Nullable JobVertexID jobVertexId, SortedMap<Instant, CollectedMetrics> metricsHistory, int minElements) {
        double sum = 0.0;
        int n = 0;
        boolean anyInfinite = false;
        for (CollectedMetrics collectedMetrics : metricsHistory.values()) {
            Map<ScalingMetric, Double> metrics = jobVertexId != null ? collectedMetrics.getVertexMetrics().get(jobVertexId) : collectedMetrics.getGlobalMetrics();
            double num = metrics.getOrDefault((Object)metric, Double.NaN);
            if (Double.isNaN(num)) continue;
            if (Double.isInfinite(num)) {
                anyInfinite = true;
                continue;
            }
            sum += num;
            ++n;
        }
        if (n == 0) {
            return anyInfinite ? Double.POSITIVE_INFINITY : Double.NaN;
        }
        return n < minElements ? Double.NaN : sum / (double)n;
    }

    @VisibleForTesting
    protected static double computeEdgeOutputRatio(JobVertexID from, JobVertexID to, JobTopology topology, SortedMap<Instant, CollectedMetrics> metricsHistory) {
        double outputRate;
        double inputRate = ScalingMetricEvaluator.getRate(ScalingMetric.NUM_RECORDS_IN, from, metricsHistory);
        double outputRatio = 0.0;
        if (inputRate > 0.0 && (outputRate = ScalingMetricEvaluator.computeEdgeDataRate(topology, metricsHistory, from, to)) > 0.0) {
            outputRatio = outputRate / inputRate;
        }
        return outputRatio;
    }

    @VisibleForTesting
    protected static double computeEdgeDataRate(JobTopology topology, SortedMap<Instant, CollectedMetrics> metricsHistory, JobVertexID from, JobVertexID to) {
        Set<JobVertexID> toVertexInputs = topology.get(to).getInputs().keySet();
        if (toVertexInputs.size() == 1) {
            LOG.debug("Computing edge ({}, {}) data rate for single input downstream task", (Object)from, (Object)to);
            return ScalingMetricEvaluator.getRate(ScalingMetric.NUM_RECORDS_IN, to, metricsHistory);
        }
        double inputRateFromOtherVertices = 0.0;
        for (JobVertexID input : toVertexInputs) {
            if (input.equals((Object)from)) continue;
            if (topology.get(input).getOutputs().size() == 1) {
                inputRateFromOtherVertices += ScalingMetricEvaluator.getRate(ScalingMetric.NUM_RECORDS_OUT, input, metricsHistory);
                continue;
            }
            inputRateFromOtherVertices = Double.NaN;
            break;
        }
        if (!Double.isNaN(inputRateFromOtherVertices)) {
            LOG.debug("Computing edge ({}, {}) data rate by subtracting upstream input rates", (Object)from, (Object)to);
            return ScalingMetricEvaluator.getRate(ScalingMetric.NUM_RECORDS_IN, to, metricsHistory) - inputRateFromOtherVertices;
        }
        LOG.debug("Computing edge ({}, {}) data rate by falling back to from num records out", (Object)from, (Object)to);
        return ScalingMetricEvaluator.getRate(ScalingMetric.NUM_RECORDS_OUT, from, metricsHistory);
    }
}

