/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.autoscaler.metrics;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.commons.math3.util.Precision;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.metrics.FlinkMetric;
import org.apache.flink.autoscaler.metrics.MetricAggregator;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.autoscaler.topology.IOMetrics;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ScalingMetrics {
    private static final Logger LOG = LoggerFactory.getLogger(ScalingMetrics.class);

    public static void computeLoadMetrics(JobVertexID jobVertexID, Map<FlinkMetric, AggregatedMetric> flinkMetrics, Map<ScalingMetric, Double> scalingMetrics, IOMetrics ioMetrics, Configuration conf) {
        scalingMetrics.put(ScalingMetric.LOAD, ScalingMetrics.getBusyTimeMsPerSecond(flinkMetrics, conf, jobVertexID) / 1000.0);
        scalingMetrics.put(ScalingMetric.ACCUMULATED_BUSY_TIME, ioMetrics.getAccumulatedBusyTime());
    }

    private static double getBusyTimeMsPerSecond(Map<FlinkMetric, AggregatedMetric> flinkMetrics, Configuration conf, JobVertexID jobVertexId) {
        MetricAggregator busyTimeAggregator = (MetricAggregator)((Object)conf.get(AutoScalerOptions.BUSY_TIME_AGGREGATOR));
        double busyTimeMsPerSecond = busyTimeAggregator.get(flinkMetrics.get((Object)FlinkMetric.BUSY_TIME_PER_SEC));
        if (!Double.isFinite(busyTimeMsPerSecond)) {
            if (AutoScalerUtils.excludeVertexFromScaling(conf, jobVertexId)) {
                LOG.warn("No busyTimeMsPerSecond metric available for {}. No scaling will be performed for this vertex.", (Object)jobVertexId);
            }
            return Double.NaN;
        }
        return Math.max(0.0, busyTimeMsPerSecond);
    }

    public static void computeDataRateMetrics(JobVertexID jobVertexID, Map<FlinkMetric, AggregatedMetric> flinkMetrics, Map<ScalingMetric, Double> scalingMetrics, JobTopology topology, Configuration conf, Supplier<Double> observedTprAvg) {
        double numRecordsInPerSecond;
        boolean isSource = topology.isSource(jobVertexID);
        IOMetrics ioMetrics = topology.get(jobVertexID).getIoMetrics();
        double numRecordsIn = ScalingMetrics.getNumRecordsInAccumulated(flinkMetrics, ioMetrics, jobVertexID, isSource);
        scalingMetrics.put(ScalingMetric.NUM_RECORDS_IN, numRecordsIn);
        scalingMetrics.put(ScalingMetric.NUM_RECORDS_OUT, Double.valueOf(ioMetrics.getNumRecordsOut()));
        if (isSource && !Double.isNaN(numRecordsInPerSecond = ScalingMetrics.getSourceNumRecordsInPerSecond(flinkMetrics, jobVertexID))) {
            Double observedTprOpt = ScalingMetrics.getObservedTpr(flinkMetrics, scalingMetrics, numRecordsInPerSecond, conf).orElseGet(observedTprAvg);
            scalingMetrics.put(ScalingMetric.OBSERVED_TPR, observedTprOpt);
        }
    }

    private static Optional<Double> getObservedTpr(Map<FlinkMetric, AggregatedMetric> flinkMetrics, Map<ScalingMetric, Double> scalingMetrics, double numRecordsInPerSecond, Configuration conf) {
        boolean catchingUp;
        if (numRecordsInPerSecond == 0.0) {
            return Optional.of(Double.POSITIVE_INFINITY);
        }
        boolean bl = catchingUp = scalingMetrics.getOrDefault((Object)ScalingMetric.LAG, 0.0) >= (double)((Duration)conf.get(AutoScalerOptions.OBSERVE_TRUE_PROCESSING_RATE_LAG_THRESHOLD)).toSeconds() * numRecordsInPerSecond;
        if (!catchingUp) {
            return Optional.empty();
        }
        double observedTpr = ScalingMetrics.computeObservedTprWithBackpressure(numRecordsInPerSecond, flinkMetrics.get((Object)FlinkMetric.BACKPRESSURE_TIME_PER_SEC).getAvg());
        return Double.isNaN(observedTpr) ? Optional.empty() : Optional.of(observedTpr);
    }

    public static double computeObservedTprWithBackpressure(double numRecordsInPerSecond, double backpressureMsPerSeconds) {
        if (backpressureMsPerSeconds >= 1000.0) {
            return Double.NaN;
        }
        double nonBackpressuredRate = 1.0 - backpressureMsPerSeconds / 1000.0;
        return numRecordsInPerSecond / nonBackpressuredRate;
    }

    public static Map<ScalingMetric, Double> computeGlobalMetrics(Map<FlinkMetric, Metric> collectedJmMetrics, Map<FlinkMetric, AggregatedMetric> collectedTmMetrics, Configuration conf) {
        AggregatedMetric metaspaceMemory;
        AggregatedMetric managedMemory;
        if (collectedTmMetrics == null) {
            return null;
        }
        HashMap<ScalingMetric, Double> out = new HashMap<ScalingMetric, Double>();
        try {
            Double numTotalTaskSlots = Double.valueOf(collectedJmMetrics.get((Object)FlinkMetric.NUM_TASK_SLOTS_TOTAL).getValue());
            Double numTaskSlotsAvailable = Double.valueOf(collectedJmMetrics.get((Object)FlinkMetric.NUM_TASK_SLOTS_AVAILABLE).getValue());
            out.put(ScalingMetric.NUM_TASK_SLOTS_USED, numTotalTaskSlots - numTaskSlotsAvailable);
        }
        catch (Exception e) {
            LOG.debug("Slot metrics and registered task managers not available");
        }
        AggregatedMetric gcTime = collectedTmMetrics.get((Object)FlinkMetric.TOTAL_GC_TIME_PER_SEC);
        if (gcTime != null) {
            out.put(ScalingMetric.GC_PRESSURE, gcTime.getMax() / 1000.0);
        }
        AggregatedMetric heapMax = collectedTmMetrics.get((Object)FlinkMetric.HEAP_MEMORY_MAX);
        AggregatedMetric heapUsed = collectedTmMetrics.get((Object)FlinkMetric.HEAP_MEMORY_USED);
        if (heapMax != null && heapUsed != null) {
            out.put(ScalingMetric.HEAP_MEMORY_USED, heapUsed.getMax());
            out.put(ScalingMetric.HEAP_MAX_USAGE_RATIO, heapUsed.getMax() / heapMax.getMax());
        }
        if ((managedMemory = collectedTmMetrics.get((Object)FlinkMetric.MANAGED_MEMORY_USED)) != null) {
            out.put(ScalingMetric.MANAGED_MEMORY_USED, managedMemory.getMax());
        }
        if ((metaspaceMemory = collectedTmMetrics.get((Object)FlinkMetric.METASPACE_MEMORY_USED)) != null) {
            out.put(ScalingMetric.METASPACE_MEMORY_USED, metaspaceMemory.getMax());
        }
        return out;
    }

    public static void computeLagMetrics(Map<FlinkMetric, AggregatedMetric> flinkMetrics, Map<ScalingMetric, Double> scalingMetrics) {
        AggregatedMetric pendingRecords = flinkMetrics.get((Object)FlinkMetric.PENDING_RECORDS);
        if (pendingRecords != null) {
            scalingMetrics.put(ScalingMetric.LAG, pendingRecords.getSum());
        } else {
            scalingMetrics.put(ScalingMetric.LAG, 0.0);
        }
    }

    private static double getSourceNumRecordsInPerSecond(Map<FlinkMetric, AggregatedMetric> flinkMetrics, JobVertexID jobVertexID) {
        return ScalingMetrics.getNumRecordsInInternal(flinkMetrics, null, jobVertexID, true, true);
    }

    private static double getNumRecordsInAccumulated(Map<FlinkMetric, AggregatedMetric> flinkMetrics, IOMetrics ioMetrics, JobVertexID jobVertexID, boolean isSource) {
        return ScalingMetrics.getNumRecordsInInternal(flinkMetrics, ioMetrics, jobVertexID, isSource, false);
    }

    private static double getNumRecordsInInternal(Map<FlinkMetric, AggregatedMetric> flinkMetrics, IOMetrics ioMetrics, JobVertexID jobVertexID, boolean isSource, boolean perSecond) {
        AggregatedMetric numRecords;
        AggregatedMetric aggregatedMetric = numRecords = perSecond ? null : new AggregatedMetric("n", Double.NaN, Double.NaN, Double.NaN, Double.valueOf(ioMetrics.getNumRecordsIn()), Double.NaN);
        if (isSource && (numRecords == null || numRecords.getSum() == 0.0)) {
            AggregatedMetric sourceTaskIn = flinkMetrics.get((Object)(perSecond ? FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC : FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN));
            AggregatedMetric aggregatedMetric2 = numRecords = sourceTaskIn != null ? sourceTaskIn : numRecords;
        }
        if (isSource && (numRecords == null || numRecords.getSum() == 0.0)) {
            AggregatedMetric sourceTaskOut = flinkMetrics.get((Object)(perSecond ? FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC : FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT));
            AggregatedMetric aggregatedMetric3 = numRecords = sourceTaskOut != null ? sourceTaskOut : numRecords;
        }
        if (numRecords == null) {
            LOG.debug("Received null input rate for {}. Returning NaN.", (Object)jobVertexID);
            return Double.NaN;
        }
        return Math.max(0.0, numRecords.getSum());
    }

    public static double roundMetric(double value) {
        double rounded = Precision.round((double)value, (int)3);
        return rounded == 0.0 ? value : rounded;
    }
}

