/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.autoscaler;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.ScalingMetricEvaluator;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.exceptions.NotReadyException;
import org.apache.flink.autoscaler.metrics.CollectedMetricHistory;
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
import org.apache.flink.autoscaler.metrics.FlinkMetric;
import org.apache.flink.autoscaler.metrics.MetricNotFoundException;
import org.apache.flink.autoscaler.metrics.ScalingHistoryUtils;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.autoscaler.metrics.ScalingMetrics;
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
import org.apache.flink.autoscaler.topology.IOMetrics;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.topology.VertexInfo;
import org.apache.flink.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.autoscaler.utils.DateTimeUtils;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ScalingMetricCollector<KEY, Context extends JobAutoScalerContext<KEY>> {
    private static final Logger LOG = LoggerFactory.getLogger(ScalingMetricCollector.class);
    private final Map<KEY, Map<JobVertexID, Map<String, FlinkMetric>>> availableVertexMetricNames = new ConcurrentHashMap<KEY, Map<JobVertexID, Map<String, FlinkMetric>>>();
    private final Map<KEY, SortedMap<Instant, CollectedMetrics>> histories = new ConcurrentHashMap<KEY, SortedMap<Instant, CollectedMetrics>>();
    protected final Map<KEY, Boolean> jobsWithGcMetrics = new ConcurrentHashMap<KEY, Boolean>();
    private Clock clock = Clock.systemDefaultZone();

    public CollectedMetricHistory updateMetrics(Context ctx, AutoScalerStateStore<KEY, Context> stateStore) throws Exception {
        Object jobKey = ((JobAutoScalerContext)ctx).getJobKey();
        Configuration conf = ((JobAutoScalerContext)ctx).getConfiguration();
        Instant now = this.clock.instant();
        SortedMap metricHistory = this.histories.computeIfAbsent(jobKey, k -> {
            try {
                return stateStore.getCollectedMetrics(ctx);
            }
            catch (Exception exception) {
                throw new RuntimeException("Get evaluated metrics failed.", exception);
            }
        });
        JobDetailsInfo jobDetailsInfo = this.getJobDetailsInfo((JobAutoScalerContext<KEY>)ctx, (Duration)conf.get(AutoScalerOptions.FLINK_CLIENT_TIMEOUT));
        Instant jobRunningTs = this.getJobRunningTs(jobDetailsInfo);
        if (!metricHistory.isEmpty() && jobRunningTs.isAfter((Instant)metricHistory.firstKey())) {
            LOG.info("Job updated at {}. Clearing metrics.", (Object)DateTimeUtils.readable(jobRunningTs));
            stateStore.removeCollectedMetrics(ctx);
            this.cleanup(((JobAutoScalerContext)ctx).getJobKey());
            metricHistory.clear();
        }
        JobTopology topology = this.getJobTopology(ctx, stateStore, jobDetailsInfo);
        Instant stableTime = jobRunningTs.plus((TemporalAmount)conf.get(AutoScalerOptions.STABILIZATION_INTERVAL));
        boolean isStabilizing = now.isBefore(stableTime);
        Duration metricWindowSize = this.getMetricWindowSize(conf);
        Instant windowFullTime = ScalingMetricCollector.getWindowFullTime(metricHistory.tailMap(stableTime), now, metricWindowSize);
        Map<JobVertexID, Map<String, FlinkMetric>> filteredVertexMetricNames = this.queryFilteredMetricNames(ctx, topology, isStabilizing);
        Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> collectedVertexMetrics = this.queryAllAggregatedMetrics(ctx, filteredVertexMetricNames);
        Map<FlinkMetric, Metric> collectedJmMetrics = this.queryJmMetrics(ctx);
        Map<FlinkMetric, AggregatedMetric> collectedTmMetrics = this.queryTmMetrics(ctx);
        CollectedMetrics scalingMetrics = this.convertToScalingMetrics(jobKey, collectedVertexMetrics, collectedJmMetrics, collectedTmMetrics, topology, conf);
        metricHistory.put(now, scalingMetrics);
        CollectedMetricHistory collectedMetrics = new CollectedMetricHistory(topology, metricHistory, jobRunningTs);
        if (now.isBefore(windowFullTime)) {
            if (isStabilizing) {
                LOG.info("Stabilizing until {}", (Object)DateTimeUtils.readable(stableTime));
            } else {
                LOG.info("Metric window is not full until {}. {} samples collected so far", (Object)DateTimeUtils.readable(windowFullTime), (Object)metricHistory.size());
            }
        } else {
            collectedMetrics.setFullyCollected(true);
            Instant trimBefore = now.minus(metricWindowSize);
            int numDropped = this.removeMetricsBefore(trimBefore, metricHistory);
            LOG.debug("Metric window is now full. Dropped {} samples before {}, keeping {}.", new Object[]{numDropped, DateTimeUtils.readable(trimBefore), metricHistory.size()});
        }
        stateStore.storeCollectedMetrics(ctx, metricHistory);
        return collectedMetrics;
    }

    private int removeMetricsBefore(Instant cutOffTimestamp, SortedMap<Instant, CollectedMetrics> metricHistory) {
        SortedMap<Instant, CollectedMetrics> toBeDropped = metricHistory.headMap(cutOffTimestamp);
        int numDropped = toBeDropped.size();
        toBeDropped.clear();
        return numDropped;
    }

    protected abstract Map<FlinkMetric, Metric> queryJmMetrics(Context var1) throws Exception;

    protected abstract Map<FlinkMetric, AggregatedMetric> queryTmMetrics(Context var1) throws Exception;

    protected Duration getMetricWindowSize(Configuration conf) {
        return (Duration)conf.get(AutoScalerOptions.METRICS_WINDOW);
    }

    private static Instant getWindowFullTime(SortedMap<Instant, CollectedMetrics> metricsAfterStable, Instant now, Duration metricWindowSize) {
        return metricsAfterStable.isEmpty() ? now.plus(metricWindowSize) : metricsAfterStable.firstKey().plus(metricWindowSize);
    }

    @VisibleForTesting
    protected Instant getJobRunningTs(JobDetailsInfo jobDetailsInfo) {
        Map<JobStatus, Long> timestamps = jobDetailsInfo.getTimestamps();
        Long runningTs = timestamps.get(JobStatus.RUNNING);
        Preconditions.checkState((runningTs != null ? 1 : 0) != 0, (Object)"Unable to find when the job was switched to RUNNING.");
        return Instant.ofEpochMilli(runningTs);
    }

    protected JobTopology getJobTopology(Context ctx, AutoScalerStateStore<KEY, Context> stateStore, JobDetailsInfo jobDetailsInfo) throws Exception {
        JobTopology t = this.getJobTopology(jobDetailsInfo);
        Set<JobVertexID> vertexSet = Set.copyOf(t.getVerticesInTopologicalOrder());
        ScalingHistoryUtils.updateVertexList(stateStore, ctx, this.clock.instant(), vertexSet);
        this.updateKafkaPulsarSourceNumPartitions(ctx, jobDetailsInfo.getJobId(), t);
        AutoScalerUtils.excludeVerticesFromScaling(((JobAutoScalerContext)ctx).getConfiguration(), t.getFinishedVertices());
        return t;
    }

    @VisibleForTesting
    protected JobTopology getJobTopology(JobDetailsInfo jobDetailsInfo) {
        Map<JobVertexID, SlotSharingGroupId> slotSharingGroupIdMap = jobDetailsInfo.getJobVertexInfos().stream().filter(e -> e.getSlotSharingGroupId() != null).collect(Collectors.toMap(JobDetailsInfo.JobVertexDetailsInfo::getJobVertexID, JobDetailsInfo.JobVertexDetailsInfo::getSlotSharingGroupId));
        Map<JobVertexID, Integer> maxParallelismMap = jobDetailsInfo.getJobVertexInfos().stream().collect(Collectors.toMap(JobDetailsInfo.JobVertexDetailsInfo::getJobVertexID, JobDetailsInfo.JobVertexDetailsInfo::getMaxParallelism));
        String rawPlan = jobDetailsInfo.getJsonPlan();
        String json = rawPlan.substring("RawJson{json='".length(), rawPlan.length() - "'}".length());
        HashMap<JobVertexID, IOMetrics> metrics = new HashMap<JobVertexID, IOMetrics>();
        HashSet<JobVertexID> finished = new HashSet<JobVertexID>();
        jobDetailsInfo.getJobVertexInfos().forEach(d -> {
            if (d.getExecutionState() == ExecutionState.FINISHED) {
                finished.add(d.getJobVertexID());
            }
            metrics.put(d.getJobVertexID(), IOMetrics.from(d.getJobVertexMetrics()));
        });
        return JobTopology.fromJsonPlan(json, slotSharingGroupIdMap, maxParallelismMap, metrics, finished);
    }

    private void updateKafkaPulsarSourceNumPartitions(Context ctx, JobID jobId, JobTopology topology) throws Exception {
        try (RestClusterClient<String> restClient = ((JobAutoScalerContext)ctx).getRestClusterClient();){
            Pattern partitionRegex = Pattern.compile("^.*\\.KafkaSourceReader\\.topic\\.(?<kafkaTopic>.+)\\.partition\\.(?<kafkaId>\\d+)\\.currentOffset$|^.*\\.PulsarConsumer\\.(?<pulsarTopic>.+)-partition-(?<pulsarId>\\d+)\\..*\\.numMsgsReceived$");
            for (VertexInfo vertexInfo : topology.getVertexInfos().values()) {
                JobVertexID sourceVertex;
                int numPartitions;
                if (!vertexInfo.getInputs().isEmpty() || (numPartitions = this.queryAggregatedMetricNames(restClient, jobId, sourceVertex = vertexInfo.getId()).stream().map(v -> {
                    Matcher matcher = partitionRegex.matcher((CharSequence)v);
                    if (matcher.matches()) {
                        String kafkaTopic = matcher.group("kafkaTopic");
                        String kafkaId = matcher.group("kafkaId");
                        String pulsarTopic = matcher.group("pulsarTopic");
                        String pulsarId = matcher.group("pulsarId");
                        return kafkaTopic != null ? kafkaTopic + "-" + kafkaId : pulsarTopic + "-" + pulsarId;
                    }
                    return null;
                }).filter(Objects::nonNull).collect(Collectors.toSet()).size()) <= 0) continue;
                LOG.debug("Updating source {} max parallelism based on available partitions to {}", (Object)sourceVertex, (Object)numPartitions);
                topology.get(sourceVertex).setNumSourcePartitions(numPartitions);
            }
        }
    }

    private CollectedMetrics convertToScalingMetrics(KEY jobKey, Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> collectedMetrics, Map<FlinkMetric, Metric> collectedJmMetrics, Map<FlinkMetric, AggregatedMetric> collectedTmMetrics, JobTopology jobTopology, Configuration conf) {
        HashMap<JobVertexID, Map<ScalingMetric, Double>> out = new HashMap<JobVertexID, Map<ScalingMetric, Double>>();
        Set<JobVertexID> finishedVertices = jobTopology.getFinishedVertices();
        if (!finishedVertices.isEmpty()) {
            collectedMetrics = new HashMap<JobVertexID, Map<FlinkMetric, AggregatedMetric>>(collectedMetrics);
            for (JobVertexID v : finishedVertices) {
                collectedMetrics.put(v, FlinkMetric.FINISHED_METRICS);
            }
        }
        collectedMetrics.forEach((jobVertexID, vertexFlinkMetrics) -> {
            LOG.debug("Calculating vertex scaling metrics for {} from {}", jobVertexID, vertexFlinkMetrics);
            HashMap<ScalingMetric, Double> vertexScalingMetrics = new HashMap<ScalingMetric, Double>();
            out.put((JobVertexID)jobVertexID, (Map<ScalingMetric, Double>)vertexScalingMetrics);
            if (jobTopology.isSource((JobVertexID)jobVertexID)) {
                ScalingMetrics.computeLagMetrics(vertexFlinkMetrics, vertexScalingMetrics);
            }
            ScalingMetrics.computeLoadMetrics(jobVertexID, vertexFlinkMetrics, vertexScalingMetrics, jobTopology.get((JobVertexID)jobVertexID).getIoMetrics(), conf);
            SortedMap<Instant, CollectedMetrics> metricHistory = this.histories.getOrDefault(jobKey, Collections.emptySortedMap());
            ScalingMetrics.computeDataRateMetrics(jobVertexID, vertexFlinkMetrics, vertexScalingMetrics, jobTopology, conf, ScalingMetricCollector.observedTprAvg(jobVertexID, metricHistory, (Integer)conf.get(AutoScalerOptions.OBSERVED_TRUE_PROCESSING_RATE_MIN_OBSERVATIONS)));
            vertexScalingMetrics.entrySet().forEach(e -> e.setValue(ScalingMetrics.roundMetric((Double)e.getValue())));
            LOG.debug("Vertex scaling metrics for {}: {}", jobVertexID, vertexScalingMetrics);
        });
        Map<ScalingMetric, Double> globalMetrics = ScalingMetrics.computeGlobalMetrics(collectedJmMetrics, collectedTmMetrics, conf);
        LOG.debug("Global metrics: {}", globalMetrics);
        return new CollectedMetrics(out, globalMetrics);
    }

    private static Supplier<Double> observedTprAvg(JobVertexID jobVertexID, SortedMap<Instant, CollectedMetrics> metricHistory, int minObservations) {
        return () -> ScalingMetricEvaluator.getAverage(ScalingMetric.OBSERVED_TPR, jobVertexID, metricHistory, minObservations);
    }

    private Map<JobVertexID, Map<String, FlinkMetric>> queryFilteredMetricNames(Context ctx, JobTopology topology, boolean isStabilizing) {
        try {
            return this.queryFilteredMetricNames(ctx, topology);
        }
        catch (MetricNotFoundException e) {
            if (isStabilizing) {
                throw new NotReadyException(e);
            }
            throw e;
        }
    }

    protected Map<JobVertexID, Map<String, FlinkMetric>> queryFilteredMetricNames(Context ctx, JobTopology topology) {
        List<JobVertexID> vertices = topology.getVerticesInTopologicalOrder();
        Map names = this.availableVertexMetricNames.compute(((JobAutoScalerContext)ctx).getJobKey(), (k, previousMetricNames) -> {
            if (previousMetricNames != null && previousMetricNames.keySet().equals(topology.getVertexInfos().keySet())) {
                HashMap<JobVertexID, Map<String, FlinkMetric>> newMetricNames = new HashMap<JobVertexID, Map<String, FlinkMetric>>((Map<JobVertexID, Map<String, FlinkMetric>>)previousMetricNames);
                Map<JobVertexID, Map<String, FlinkMetric>> sourceMetricNames = this.queryFilteredMetricNames(ctx, topology, vertices.stream().filter(topology::isSource));
                newMetricNames.putAll(sourceMetricNames);
                return newMetricNames;
            }
            return this.queryFilteredMetricNames(ctx, topology, vertices.stream());
        });
        names.keySet().removeAll(topology.getFinishedVertices());
        return names;
    }

    private Map<JobVertexID, Map<String, FlinkMetric>> queryFilteredMetricNames(Context ctx, JobTopology topology, Stream<JobVertexID> vertexStream) {
        Map<JobVertexID, Map<String, FlinkMetric>> map;
        block8: {
            RestClusterClient<String> restClient = ((JobAutoScalerContext)ctx).getRestClusterClient();
            try {
                map = vertexStream.filter(v -> !topology.getFinishedVertices().contains(v)).collect(Collectors.toMap(v -> v, v -> this.getFilteredVertexMetricNames(restClient, ctx.getJobID(), (JobVertexID)v, topology)));
                if (restClient == null) break block8;
            }
            catch (Throwable throwable) {
                if (restClient != null) {
                    try {
                        restClient.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                }
                throw throwable;
            }
            restClient.close();
        }
        return map;
    }

    Map<String, FlinkMetric> getFilteredVertexMetricNames(RestClusterClient<?> restClient, JobID jobID, JobVertexID jobVertexID, JobTopology topology) {
        Collection<String> allMetricNames = this.queryAggregatedMetricNames(restClient, jobID, jobVertexID);
        HashMap<String, FlinkMetric> filteredMetrics = new HashMap<String, FlinkMetric>();
        HashSet<FlinkMetric> requiredMetrics = new HashSet<FlinkMetric>();
        requiredMetrics.add(FlinkMetric.BUSY_TIME_PER_SEC);
        if (topology.isSource(jobVertexID)) {
            requiredMetrics.add(FlinkMetric.BACKPRESSURE_TIME_PER_SEC);
            requiredMetrics.add(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
            requiredMetrics.add(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN);
            List<String> pendingRecordsMetric = FlinkMetric.PENDING_RECORDS.findAll(allMetricNames);
            if (pendingRecordsMetric.isEmpty()) {
                LOG.warn("pendingRecords metric for {} could not be found. Either a legacy source or an idle source. Assuming no pending records.", (Object)jobVertexID);
            }
            pendingRecordsMetric.forEach(m -> filteredMetrics.put((String)m, FlinkMetric.PENDING_RECORDS));
            FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT.findAny(allMetricNames).ifPresent(m -> filteredMetrics.put((String)m, FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT));
        }
        for (FlinkMetric flinkMetric : requiredMetrics) {
            Optional<String> flinkMetricName = flinkMetric.findAny(allMetricNames);
            if (flinkMetricName.isPresent()) {
                filteredMetrics.put(flinkMetricName.get(), flinkMetric);
                continue;
            }
            throw new MetricNotFoundException(flinkMetric, jobVertexID);
        }
        return filteredMetrics;
    }

    @VisibleForTesting
    protected Collection<String> queryAggregatedMetricNames(RestClusterClient<?> restClient, JobID jobID, JobVertexID jobVertexID) {
        AggregatedSubtaskMetricsParameters parameters = new AggregatedSubtaskMetricsParameters();
        Iterator pathIt = parameters.getPathParameters().iterator();
        ((JobIDPathParameter)pathIt.next()).resolve((Object)jobID);
        ((JobVertexIdPathParameter)pathIt.next()).resolve((Object)jobVertexID);
        return ((AggregatedMetricsResponseBody)restClient.sendRequest((MessageHeaders)AggregatedSubtaskMetricsHeaders.getInstance(), (MessageParameters)parameters, (RequestBody)EmptyRequestBody.getInstance()).get()).getMetrics().stream().map(AggregatedMetric::getId).collect(Collectors.toSet());
    }

    protected abstract Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> queryAllAggregatedMetrics(Context var1, Map<JobVertexID, Map<String, FlinkMetric>> var2);

    public JobDetailsInfo getJobDetailsInfo(JobAutoScalerContext<KEY> context, Duration clientTimeout) throws Exception {
        try (RestClusterClient<String> restClient = context.getRestClusterClient();){
            JobDetailsInfo jobDetailsInfo = (JobDetailsInfo)restClient.getJobDetails(context.getJobID()).get(clientTimeout.toSeconds(), TimeUnit.SECONDS);
            return jobDetailsInfo;
        }
    }

    public void cleanup(KEY jobKey) {
        this.histories.remove(jobKey);
        this.availableVertexMetricNames.remove(jobKey);
        this.jobsWithGcMetrics.remove(jobKey);
    }

    @VisibleForTesting
    protected void setClock(Clock clock) {
        this.clock = (Clock)Preconditions.checkNotNull((Object)clock);
    }

    @VisibleForTesting
    protected Map<KEY, Map<JobVertexID, Map<String, FlinkMetric>>> getAvailableVertexMetricNames() {
        return this.availableVertexMetricNames;
    }

    @VisibleForTesting
    protected Map<KEY, SortedMap<Instant, CollectedMetrics>> getHistories() {
        return this.histories;
    }
}

