/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.autoscaler;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.ScalingMetricCollector;
import org.apache.flink.autoscaler.metrics.FlinkMetric;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregateTaskManagerMetricsParameters;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedTaskManagerMetricsHeaders;
import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders;
import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsMessageParameters;
import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody;
import org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter;
import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RestApiMetricsCollector<KEY, Context extends JobAutoScalerContext<KEY>>
extends ScalingMetricCollector<KEY, Context> {
    private static final Logger LOG = LoggerFactory.getLogger(RestApiMetricsCollector.class);
    private static final Map<String, FlinkMetric> COMMON_TM_METRIC_NAMES = Map.of("Status.JVM.Memory.Heap.Max", FlinkMetric.HEAP_MEMORY_MAX, "Status.JVM.Memory.Heap.Used", FlinkMetric.HEAP_MEMORY_USED, "Status.Flink.Memory.Managed.Used", FlinkMetric.MANAGED_MEMORY_USED, "Status.JVM.Memory.Metaspace.Used", FlinkMetric.METASPACE_MEMORY_USED);
    private static final Map<String, FlinkMetric> TM_METRIC_NAMES_WITH_GC = ImmutableMap.builder().putAll(COMMON_TM_METRIC_NAMES).put((Object)"Status.JVM.GarbageCollector.All.TimeMsPerSecond", (Object)FlinkMetric.TOTAL_GC_TIME_PER_SEC).build();

    @Override
    protected Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> queryAllAggregatedMetrics(Context ctx, Map<JobVertexID, Map<String, FlinkMetric>> filteredVertexMetricNames) {
        return filteredVertexMetricNames.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> this.queryAggregatedVertexMetrics(ctx, (JobVertexID)e.getKey(), (Map)e.getValue())));
    }

    protected Map<FlinkMetric, AggregatedMetric> queryAggregatedVertexMetrics(Context ctx, JobVertexID jobVertexID, Map<String, FlinkMetric> metrics) {
        Map<FlinkMetric, AggregatedMetric> map;
        block8: {
            LOG.debug("Querying metrics {} for {}", metrics, (Object)jobVertexID);
            AggregatedSubtaskMetricsParameters parameters = new AggregatedSubtaskMetricsParameters();
            Iterator pathIt = parameters.getPathParameters().iterator();
            ((JobIDPathParameter)pathIt.next()).resolve((Object)((JobAutoScalerContext)ctx).getJobID());
            ((JobVertexIdPathParameter)pathIt.next()).resolve((Object)jobVertexID);
            ((MessageQueryParameter)parameters.getQueryParameters().iterator().next()).resolveFromString(StringUtils.join(metrics.keySet(), (String)","));
            RestClusterClient<String> restClient = ((JobAutoScalerContext)ctx).getRestClusterClient();
            try {
                AggregatedMetricsResponseBody responseBody = (AggregatedMetricsResponseBody)restClient.sendRequest((MessageHeaders)AggregatedSubtaskMetricsHeaders.getInstance(), (MessageParameters)parameters, (RequestBody)EmptyRequestBody.getInstance()).get();
                map = this.aggregateByFlinkMetric(metrics, responseBody);
                if (restClient == null) break block8;
            }
            catch (Throwable throwable) {
                if (restClient != null) {
                    try {
                        restClient.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                }
                throw throwable;
            }
            restClient.close();
        }
        return map;
    }

    @Override
    protected Map<FlinkMetric, Metric> queryJmMetrics(Context ctx) {
        Map<FlinkMetric, Metric> map;
        block8: {
            Map<String, FlinkMetric> metrics = Map.of("taskSlotsTotal", FlinkMetric.NUM_TASK_SLOTS_TOTAL, "taskSlotsAvailable", FlinkMetric.NUM_TASK_SLOTS_AVAILABLE);
            RestClusterClient<String> restClient = ((JobAutoScalerContext)ctx).getRestClusterClient();
            try {
                map = this.queryJmMetrics(restClient, metrics);
                if (restClient == null) break block8;
            }
            catch (Throwable throwable) {
                if (restClient != null) {
                    try {
                        restClient.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                }
                throw throwable;
            }
            restClient.close();
        }
        return map;
    }

    protected Map<FlinkMetric, Metric> queryJmMetrics(RestClusterClient<?> restClient, Map<String, FlinkMetric> metrics) {
        JobManagerMetricsMessageParameters parameters = new JobManagerMetricsMessageParameters();
        Iterator queryParamIt = parameters.getQueryParameters().iterator();
        MetricsFilterParameter filterParameter = (MetricsFilterParameter)queryParamIt.next();
        filterParameter.resolve(List.copyOf(metrics.keySet()));
        MetricCollectionResponseBody responseBody = (MetricCollectionResponseBody)restClient.sendRequest((MessageHeaders)JobManagerMetricsHeaders.getInstance(), (MessageParameters)parameters, (RequestBody)EmptyRequestBody.getInstance()).get();
        return responseBody.getMetrics().stream().collect(Collectors.toMap(m -> (FlinkMetric)((Object)((Object)metrics.get(m.getId()))), m -> m));
    }

    @Override
    protected Map<FlinkMetric, AggregatedMetric> queryTmMetrics(Context ctx) throws Exception {
        try (RestClusterClient<String> restClient = ((JobAutoScalerContext)ctx).getRestClusterClient();){
            boolean hasGcMetrics = this.jobsWithGcMetrics.computeIfAbsent(((JobAutoScalerContext)ctx).getJobKey(), k -> {
                boolean gcMetricsFound;
                boolean bl = gcMetricsFound = !this.queryAggregatedTmMetrics(restClient, TM_METRIC_NAMES_WITH_GC).isEmpty();
                if (!gcMetricsFound) {
                    LOG.debug("No GC metrics found, using only heap information");
                } else {
                    LOG.debug("TaskManager GC metrics found");
                }
                return gcMetricsFound;
            });
            Map<FlinkMetric, AggregatedMetric> tmMetrics = this.queryAggregatedTmMetrics(restClient, hasGcMetrics ? TM_METRIC_NAMES_WITH_GC : COMMON_TM_METRIC_NAMES);
            if (!tmMetrics.isEmpty()) {
                Map<FlinkMetric, AggregatedMetric> map = tmMetrics;
                return map;
            }
            this.jobsWithGcMetrics.remove(((JobAutoScalerContext)ctx).getJobKey());
            throw new RuntimeException("Missing required TM metrics");
        }
    }

    protected Map<FlinkMetric, AggregatedMetric> queryAggregatedTmMetrics(RestClusterClient<?> restClient, Map<String, FlinkMetric> metrics) {
        AggregateTaskManagerMetricsParameters parameters = new AggregateTaskManagerMetricsParameters();
        Iterator queryParamIt = parameters.getQueryParameters().iterator();
        MetricsFilterParameter filterParameter = (MetricsFilterParameter)queryParamIt.next();
        filterParameter.resolve(List.copyOf(metrics.keySet()));
        MetricsAggregationParameter aggregationParameter = (MetricsAggregationParameter)queryParamIt.next();
        aggregationParameter.resolve(List.of(MetricsAggregationParameter.AggregationMode.MIN, MetricsAggregationParameter.AggregationMode.MAX, MetricsAggregationParameter.AggregationMode.AVG));
        AggregatedMetricsResponseBody responseBody = (AggregatedMetricsResponseBody)restClient.sendRequest((MessageHeaders)AggregatedTaskManagerMetricsHeaders.getInstance(), (MessageParameters)parameters, (RequestBody)EmptyRequestBody.getInstance()).get();
        return this.aggregateByFlinkMetric(metrics, responseBody);
    }

    private Map<FlinkMetric, AggregatedMetric> aggregateByFlinkMetric(Map<String, FlinkMetric> metrics, AggregatedMetricsResponseBody responseBody) {
        return responseBody.getMetrics().stream().collect(Collectors.toMap(m -> (FlinkMetric)((Object)((Object)metrics.get(m.getId()))), m -> m, (m1, m2) -> new AggregatedMetric(m1.getId() + "-" + m2.getId(), m1.getMin() != null ? Double.valueOf(Math.min(m1.getMin(), m2.getMin())) : null, m1.getMax() != null ? Double.valueOf(Math.max(m1.getMax(), m2.getMax())) : null, null, m1.getSum() != null ? Double.valueOf(m1.getSum() + m2.getSum()) : null, m1.getSkew() != null ? Double.valueOf(Math.max(m1.getSkew(), m2.getSkew())) : null)));
    }
}

