/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.autoscaler;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.autoscaler.DelayedScaleDown;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.JobVertexScaler;
import org.apache.flink.autoscaler.ScalingRecord;
import org.apache.flink.autoscaler.ScalingSummary;
import org.apache.flink.autoscaler.ScalingTracking;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.ScalingHistoryUtils;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.autoscaler.resources.NoopResourceCheck;
import org.apache.flink.autoscaler.resources.ResourceCheck;
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.tuning.ConfigChanges;
import org.apache.flink.autoscaler.tuning.MemoryTuning;
import org.apache.flink.autoscaler.utils.CalendarUtils;
import org.apache.flink.autoscaler.utils.ResourceCheckUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ScalingExecutor<KEY, Context extends JobAutoScalerContext<KEY>> {
    public static final String GC_PRESSURE_MESSAGE = "GC Pressure %s is above the allowed limit for scaling operations. Please adjust the available memory manually.";
    public static final String HEAP_USAGE_MESSAGE = "Heap Usage %s is above the allowed limit for scaling operations. Please adjust the available memory manually.";
    public static final String RESOURCE_QUOTA_REACHED_MESSAGE = "Resource usage is above the allowed limit for scaling operations. Please adjust the resource quota manually.";
    private static final Logger LOG = LoggerFactory.getLogger(ScalingExecutor.class);
    private final JobVertexScaler<KEY, Context> jobVertexScaler;
    private final AutoScalerEventHandler<KEY, Context> autoScalerEventHandler;
    private final AutoScalerStateStore<KEY, Context> autoScalerStateStore;
    private final ResourceCheck resourceCheck;

    public ScalingExecutor(AutoScalerEventHandler<KEY, Context> autoScalerEventHandler, AutoScalerStateStore<KEY, Context> autoScalerStateStore) {
        this(autoScalerEventHandler, autoScalerStateStore, null);
    }

    public ScalingExecutor(AutoScalerEventHandler<KEY, Context> autoScalerEventHandler, AutoScalerStateStore<KEY, Context> autoScalerStateStore, @Nullable ResourceCheck resourceCheck) {
        this.jobVertexScaler = new JobVertexScaler<KEY, Context>(autoScalerEventHandler);
        this.autoScalerEventHandler = autoScalerEventHandler;
        this.autoScalerStateStore = autoScalerStateStore;
        this.resourceCheck = resourceCheck != null ? resourceCheck : new NoopResourceCheck();
    }

    public boolean scaleResource(Context context, EvaluatedMetrics evaluatedMetrics, Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory, ScalingTracking scalingTracking, Instant now, JobTopology jobTopology, DelayedScaleDown delayedScaleDown) throws Exception {
        Configuration conf = ((JobAutoScalerContext)context).getConfiguration();
        Duration restartTime = scalingTracking.getMaxRestartTimeOrDefault(conf);
        Map<JobVertexID, ScalingSummary> scalingSummaries = this.computeScalingSummary(context, evaluatedMetrics, scalingHistory, restartTime, jobTopology, delayedScaleDown);
        if (scalingSummaries.isEmpty()) {
            LOG.info("All job vertices are currently running at their target parallelism.");
            return false;
        }
        this.updateRecommendedParallelism(evaluatedMetrics.getVertexMetrics(), scalingSummaries);
        if (this.checkIfBlockedAndTriggerScalingEvent(context, scalingSummaries, conf, now)) {
            return false;
        }
        ConfigChanges configOverrides = MemoryTuning.tuneTaskManagerMemory(context, evaluatedMetrics, jobTopology, scalingSummaries, this.autoScalerEventHandler);
        Boolean memoryTuningEnabled = (Boolean)conf.get(AutoScalerOptions.MEMORY_TUNING_ENABLED);
        if (this.scalingWouldExceedMaxResources(memoryTuningEnabled != false ? configOverrides.newConfigWithOverrides(conf) : conf, jobTopology, evaluatedMetrics, scalingSummaries, context)) {
            return false;
        }
        ScalingHistoryUtils.addToScalingHistoryAndStore(this.autoScalerStateStore, context, scalingHistory, now, scalingSummaries);
        scalingTracking.addScalingRecord(now, new ScalingRecord());
        this.autoScalerStateStore.storeScalingTracking(context, scalingTracking);
        this.autoScalerStateStore.storeParallelismOverrides(context, ScalingExecutor.getVertexParallelismOverrides(evaluatedMetrics.getVertexMetrics(), scalingSummaries));
        this.autoScalerStateStore.storeConfigChanges(context, configOverrides);
        delayedScaleDown.clearAll();
        return true;
    }

    private void updateRecommendedParallelism(Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics, Map<JobVertexID, ScalingSummary> scalingSummaries) {
        scalingSummaries.forEach((jobVertexID, scalingSummary) -> ((Map)evaluatedMetrics.get(jobVertexID)).put(ScalingMetric.RECOMMENDED_PARALLELISM, EvaluatedScalingMetric.of(scalingSummary.getNewParallelism())));
    }

    @VisibleForTesting
    Map<JobVertexID, ScalingSummary> computeScalingSummary(Context context, EvaluatedMetrics evaluatedMetrics, Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory, Duration restartTime, JobTopology jobTopology, DelayedScaleDown delayedScaleDown) {
        LOG.debug("Restart time used in scaling summary computation: {}", (Object)restartTime);
        if (this.isJobUnderMemoryPressure(context, evaluatedMetrics.getGlobalMetrics())) {
            LOG.info("Skipping vertex scaling due to memory pressure");
            return Map.of();
        }
        HashMap<JobVertexID, ScalingSummary> out = new HashMap<JobVertexID, ScalingSummary>();
        List excludeVertexIdList = (List)((JobAutoScalerContext)context).getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
        AtomicBoolean anyVertexOutsideBound = new AtomicBoolean(false);
        evaluatedMetrics.getVertexMetrics().forEach((v, metrics) -> {
            if (excludeVertexIdList.contains(v.toHexString())) {
                LOG.debug("Vertex {} is part of `vertex.exclude.ids` config, Ignoring it for scaling", v);
            } else {
                int currentParallelism = (int)((EvaluatedScalingMetric)metrics.get((Object)ScalingMetric.PARALLELISM)).getCurrent();
                JobVertexScaler.ParallelismChange parallelismChange = this.jobVertexScaler.computeScaleTargetParallelism((JobAutoScalerContext)context, (JobVertexID)v, jobTopology.get((JobVertexID)v).getInputs().values(), (Map<ScalingMetric, EvaluatedScalingMetric>)metrics, scalingHistory.getOrDefault(v, Collections.emptySortedMap()), restartTime, delayedScaleDown);
                if (parallelismChange.isNoChange()) {
                    return;
                }
                if (parallelismChange.isOutsideUtilizationBound()) {
                    anyVertexOutsideBound.set(true);
                }
                out.put((JobVertexID)v, new ScalingSummary(currentParallelism, parallelismChange.getNewParallelism(), (Map<ScalingMetric, EvaluatedScalingMetric>)metrics));
            }
        });
        if (!anyVertexOutsideBound.get()) {
            LOG.info("All vertex processing rates are within target.");
            return Map.of();
        }
        return out;
    }

    private boolean isJobUnderMemoryPressure(Context ctx, Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics) {
        Configuration conf;
        double gcPressure = evaluatedMetrics.get((Object)ScalingMetric.GC_PRESSURE).getCurrent();
        if (gcPressure > (Double)(conf = ((JobAutoScalerContext)ctx).getConfiguration()).get(AutoScalerOptions.GC_PRESSURE_THRESHOLD)) {
            this.autoScalerEventHandler.handleEvent(ctx, AutoScalerEventHandler.Type.Normal, "MemoryPressure", String.format(GC_PRESSURE_MESSAGE, gcPressure), "gcPressure", (Duration)conf.get(AutoScalerOptions.SCALING_EVENT_INTERVAL));
            return true;
        }
        double heapUsage = evaluatedMetrics.get((Object)ScalingMetric.HEAP_MAX_USAGE_RATIO).getAverage();
        if (heapUsage > (Double)conf.get(AutoScalerOptions.HEAP_USAGE_THRESHOLD)) {
            this.autoScalerEventHandler.handleEvent(ctx, AutoScalerEventHandler.Type.Normal, "MemoryPressure", String.format(HEAP_USAGE_MESSAGE, heapUsage), "heapUsage", (Duration)conf.get(AutoScalerOptions.SCALING_EVENT_INTERVAL));
            return true;
        }
        return false;
    }

    @VisibleForTesting
    protected boolean scalingWouldExceedMaxResources(Configuration tunedConfig, JobTopology jobTopology, EvaluatedMetrics evaluatedMetrics, Map<JobVertexID, ScalingSummary> scalingSummaries, Context ctx) {
        if (this.scalingWouldExceedClusterResources(tunedConfig, evaluatedMetrics, scalingSummaries, (JobAutoScalerContext<?>)ctx)) {
            return true;
        }
        if (ScalingExecutor.scalingWouldExceedResourceQuota(tunedConfig, jobTopology, scalingSummaries, ctx)) {
            this.autoScalerEventHandler.handleEvent(ctx, AutoScalerEventHandler.Type.Warning, "ResourceQuotaReached", RESOURCE_QUOTA_REACHED_MESSAGE, null, (Duration)tunedConfig.get(AutoScalerOptions.SCALING_EVENT_INTERVAL));
            return true;
        }
        return false;
    }

    private boolean scalingWouldExceedClusterResources(Configuration tunedConfig, EvaluatedMetrics evaluatedMetrics, Map<JobVertexID, ScalingSummary> scalingSummaries, JobAutoScalerContext<?> ctx) {
        int newNumTms;
        double taskManagerCpu = ctx.getTaskManagerCpu().orElse(0.0);
        MemorySize taskManagerMemory = MemoryTuning.getTotalMemory(tunedConfig, ctx);
        if (taskManagerCpu <= 0.0 || taskManagerMemory.compareTo(MemorySize.ZERO) <= 0) {
            return false;
        }
        Map<ScalingMetric, EvaluatedScalingMetric> globalMetrics = evaluatedMetrics.getGlobalMetrics();
        if (!globalMetrics.containsKey((Object)ScalingMetric.NUM_TASK_SLOTS_USED)) {
            LOG.info("JM metrics not ready yet");
            return true;
        }
        int numTaskSlotsUsed = (int)globalMetrics.get((Object)ScalingMetric.NUM_TASK_SLOTS_USED).getCurrent();
        int numTaskSlotsAfterRescale = ResourceCheckUtils.estimateNumTaskSlotsAfterRescale(evaluatedMetrics.getVertexMetrics(), scalingSummaries, numTaskSlotsUsed);
        int taskSlotsPerTm = (Integer)tunedConfig.get(TaskManagerOptions.NUM_TASK_SLOTS);
        int currentNumTms = (int)Math.ceil((double)numTaskSlotsUsed / (double)taskSlotsPerTm);
        return !this.resourceCheck.trySchedule(currentNumTms, newNumTms = (int)Math.ceil((double)numTaskSlotsAfterRescale / (double)taskSlotsPerTm), taskManagerCpu, taskManagerMemory);
    }

    protected static boolean scalingWouldExceedResourceQuota(Configuration tunedConfig, JobTopology jobTopology, Map<JobVertexID, ScalingSummary> scalingSummaries, JobAutoScalerContext<?> ctx) {
        if (jobTopology == null || jobTopology.getSlotSharingGroupMapping().isEmpty()) {
            return false;
        }
        Optional cpuQuota = tunedConfig.getOptional(AutoScalerOptions.CPU_QUOTA);
        Optional memoryQuota = tunedConfig.getOptional(AutoScalerOptions.MEMORY_QUOTA);
        MemorySize tmMemory = MemoryTuning.getTotalMemory(tunedConfig, ctx);
        Double tmCpu = ctx.getTaskManagerCpu().orElse(0.0);
        if (cpuQuota.isPresent() || memoryQuota.isPresent()) {
            HashMap<SlotSharingGroupId, Integer> currentSlotSharingGroupMaxParallelisms = new HashMap<SlotSharingGroupId, Integer>();
            HashMap<SlotSharingGroupId, Integer> newSlotSharingGroupMaxParallelisms = new HashMap<SlotSharingGroupId, Integer>();
            for (Map.Entry<SlotSharingGroupId, Set<JobVertexID>> e : jobTopology.getSlotSharingGroupMapping().entrySet()) {
                int currentMaxParallelism = e.getValue().stream().filter(scalingSummaries::containsKey).mapToInt(v -> ((ScalingSummary)scalingSummaries.get(v)).getCurrentParallelism()).max().orElse(0);
                currentSlotSharingGroupMaxParallelisms.put(e.getKey(), currentMaxParallelism);
                int newMaxParallelism = e.getValue().stream().filter(scalingSummaries::containsKey).mapToInt(v -> ((ScalingSummary)scalingSummaries.get(v)).getNewParallelism()).max().orElse(0);
                newSlotSharingGroupMaxParallelisms.put(e.getKey(), newMaxParallelism);
            }
            Integer numSlotsPerTm = (Integer)tunedConfig.get(TaskManagerOptions.NUM_TASK_SLOTS);
            int currentTotalSlots = currentSlotSharingGroupMaxParallelisms.values().stream().mapToInt(Integer::intValue).sum();
            int currentNumTms = currentTotalSlots / numSlotsPerTm;
            int newTotalSlots = newSlotSharingGroupMaxParallelisms.values().stream().mapToInt(Integer::intValue).sum();
            int newNumTms = newTotalSlots / numSlotsPerTm;
            if (newNumTms <= currentNumTms) {
                LOG.debug("Skipping quota check due to new resource allocation is less or equals than the current");
                return false;
            }
            if (cpuQuota.isPresent()) {
                LOG.debug("CPU resource quota is {}, checking limits", cpuQuota.get());
                double totalCPU = tmCpu * (double)newNumTms;
                if (totalCPU > (Double)cpuQuota.get()) {
                    LOG.info("CPU resource quota reached with value: {}", (Object)totalCPU);
                    return true;
                }
            }
            if (memoryQuota.isPresent()) {
                LOG.debug("Memory resource quota is {}, checking limits", memoryQuota.get());
                long totalMemory = tmMemory.getBytes() * (long)newNumTms;
                if (totalMemory > ((MemorySize)memoryQuota.get()).getBytes()) {
                    LOG.info("Memory resource quota reached with value: {}", (Object)new MemorySize(totalMemory));
                    return true;
                }
            }
        }
        return false;
    }

    private static Map<String, String> getVertexParallelismOverrides(Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics, Map<JobVertexID, ScalingSummary> summaries) {
        HashMap<String, String> overrides = new HashMap<String, String>();
        evaluatedMetrics.forEach((id, metrics) -> {
            if (summaries.containsKey(id)) {
                overrides.put(id.toString(), String.valueOf(((ScalingSummary)summaries.get(id)).getNewParallelism()));
            } else {
                overrides.put(id.toString(), String.valueOf((int)((EvaluatedScalingMetric)metrics.get((Object)ScalingMetric.PARALLELISM)).getCurrent()));
            }
        });
        return overrides;
    }

    private boolean checkIfBlockedAndTriggerScalingEvent(Context context, Map<JobVertexID, ScalingSummary> scalingSummaries, Configuration conf, Instant now) {
        Boolean scaleEnabled = (Boolean)conf.get(AutoScalerOptions.SCALING_ENABLED);
        boolean isExcluded = CalendarUtils.inExcludedPeriods(conf, now);
        Object message = scaleEnabled == false ? "Scaling execution disabled by config " + String.format("%s:%s, recommended parallelism change:", AutoScalerOptions.SCALING_ENABLED.key(), false) : (isExcluded ? "Scaling execution disabled by config " + String.format("%s:%s, recommended parallelism change:", AutoScalerOptions.EXCLUDED_PERIODS.key(), conf.get(AutoScalerOptions.EXCLUDED_PERIODS)) : "Scaling execution enabled, begin scaling vertices:");
        this.autoScalerEventHandler.handleScalingEvent(context, scalingSummaries, (String)message, (Duration)conf.get(AutoScalerOptions.SCALING_EVENT_INTERVAL));
        return scaleEnabled == false || isExcluded;
    }

    @VisibleForTesting
    void setClock(Clock clock) {
        this.jobVertexScaler.setClock(clock);
    }
}

