/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.autoscaler;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.autoscaler.DelayedScaleDown;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.ScalingSummary;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.autoscaler.topology.ShipStrategy;
import org.apache.flink.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DescribedEnum;
import org.apache.flink.configuration.description.InlineElement;
import org.apache.flink.configuration.description.TextElement;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobVertexScaler<KEY, Context extends JobAutoScalerContext<KEY>> {
    private static final Logger LOG = LoggerFactory.getLogger(JobVertexScaler.class);
    @VisibleForTesting
    protected static final String INEFFECTIVE_SCALING = "IneffectiveScaling";
    @VisibleForTesting
    protected static final String INEFFECTIVE_MESSAGE_FORMAT = "Ineffective scaling detected for %s (expected increase: %s, actual increase %s). Blocking of ineffective scaling decisions is %s";
    @VisibleForTesting
    protected static final String SCALING_LIMITED = "ScalingLimited";
    @VisibleForTesting
    protected static final String SCALE_LIMITED_MESSAGE_FORMAT = "Scaling limited detected for %s (expected parallelism: %s, actual parallelism %s). Scaling limited due to numKeyGroupsOrPartitions : %s\uff0cupperBoundForAlignment(maxParallelism or parallelismUpperLimit): %s, parallelismLowerLimit: %s.";
    private Clock clock = Clock.system(ZoneId.systemDefault());
    private final AutoScalerEventHandler<KEY, Context> autoScalerEventHandler;

    public JobVertexScaler(AutoScalerEventHandler<KEY, Context> autoScalerEventHandler) {
        this.autoScalerEventHandler = autoScalerEventHandler;
    }

    public ParallelismChange computeScaleTargetParallelism(Context context, JobVertexID vertex, Collection<ShipStrategy> inputShipStrategies, Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics, SortedMap<Instant, ScalingSummary> history, Duration restartTime, DelayedScaleDown delayedScaleDown) {
        Configuration conf = ((JobAutoScalerContext)context).getConfiguration();
        int currentParallelism = (int)evaluatedMetrics.get((Object)ScalingMetric.PARALLELISM).getCurrent();
        double averageTrueProcessingRate = evaluatedMetrics.get((Object)ScalingMetric.TRUE_PROCESSING_RATE).getAverage();
        if (Double.isNaN(averageTrueProcessingRate)) {
            LOG.warn("True processing rate is not available for {}, cannot compute new parallelism", (Object)vertex);
            return ParallelismChange.noChange();
        }
        double targetCapacity = AutoScalerUtils.getTargetProcessingCapacity(evaluatedMetrics, conf, (Double)conf.get(AutoScalerOptions.UTILIZATION_TARGET), true, restartTime);
        if (Double.isNaN(targetCapacity)) {
            LOG.warn("Target data rate is not available for {}, cannot compute new parallelism", (Object)vertex);
            return ParallelismChange.noChange();
        }
        LOG.debug("Target processing capacity for {} is {}", (Object)vertex, (Object)targetCapacity);
        double scaleFactor = targetCapacity / averageTrueProcessingRate;
        double minScaleFactor = 1.0 - (Double)conf.get(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR);
        double maxScaleFactor = 1.0 + (Double)conf.get(AutoScalerOptions.MAX_SCALE_UP_FACTOR);
        if (scaleFactor < minScaleFactor) {
            LOG.debug("Computed scale factor of {} for {} is capped by maximum scale down factor to {}", new Object[]{scaleFactor, vertex, minScaleFactor});
            scaleFactor = minScaleFactor;
        } else if (scaleFactor > maxScaleFactor) {
            LOG.debug("Computed scale factor of {} for {} is capped by maximum scale up factor to {}", new Object[]{scaleFactor, vertex, maxScaleFactor});
            scaleFactor = maxScaleFactor;
        }
        double cappedTargetCapacity = averageTrueProcessingRate * scaleFactor;
        LOG.debug("Capped target processing capacity for {} is {}", (Object)vertex, (Object)cappedTargetCapacity);
        int newParallelism = JobVertexScaler.scale(vertex, currentParallelism, inputShipStrategies, (int)evaluatedMetrics.get((Object)ScalingMetric.NUM_SOURCE_PARTITIONS).getCurrent(), (int)evaluatedMetrics.get((Object)ScalingMetric.MAX_PARALLELISM).getCurrent(), scaleFactor, Math.min(currentParallelism, conf.getInteger(AutoScalerOptions.VERTEX_MIN_PARALLELISM)), Math.max(currentParallelism, conf.getInteger(AutoScalerOptions.VERTEX_MAX_PARALLELISM)), this.autoScalerEventHandler, context);
        if (newParallelism == currentParallelism) {
            delayedScaleDown.clearVertex(vertex);
            return ParallelismChange.noChange();
        }
        evaluatedMetrics.put(ScalingMetric.EXPECTED_PROCESSING_RATE, EvaluatedScalingMetric.of(cappedTargetCapacity));
        return this.detectBlockScaling(context, vertex, conf, evaluatedMetrics, history, currentParallelism, newParallelism, delayedScaleDown);
    }

    private ParallelismChange detectBlockScaling(Context context, JobVertexID vertex, Configuration conf, Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics, SortedMap<Instant, ScalingSummary> history, int currentParallelism, int newParallelism, DelayedScaleDown delayedScaleDown) {
        boolean scaledUp;
        Preconditions.checkArgument((currentParallelism != newParallelism ? 1 : 0) != 0, (Object)"The newParallelism is equal to currentParallelism, no scaling is needed. This is probably a bug.");
        boolean outsideUtilizationBound = JobVertexScaler.outsideUtilizationBound(vertex, evaluatedMetrics);
        boolean bl = scaledUp = currentParallelism < newParallelism;
        if (scaledUp) {
            delayedScaleDown.clearVertex(vertex);
            if (history.isEmpty()) {
                return ParallelismChange.build(newParallelism, outsideUtilizationBound);
            }
            ScalingSummary lastSummary = (ScalingSummary)history.get(history.lastKey());
            if (currentParallelism == lastSummary.getNewParallelism() && lastSummary.isScaledUp() && this.detectIneffectiveScaleUp(context, vertex, conf, evaluatedMetrics, lastSummary)) {
                return ParallelismChange.noChange();
            }
            return ParallelismChange.build(newParallelism, outsideUtilizationBound);
        }
        return this.applyScaleDownInterval(delayedScaleDown, vertex, conf, newParallelism, outsideUtilizationBound);
    }

    private static boolean outsideUtilizationBound(JobVertexID vertex, Map<ScalingMetric, EvaluatedScalingMetric> metrics) {
        double trueProcessingRate = metrics.get((Object)ScalingMetric.TRUE_PROCESSING_RATE).getAverage();
        double scaleUpRateThreshold = metrics.get((Object)ScalingMetric.SCALE_UP_RATE_THRESHOLD).getCurrent();
        double scaleDownRateThreshold = metrics.get((Object)ScalingMetric.SCALE_DOWN_RATE_THRESHOLD).getCurrent();
        if (trueProcessingRate < scaleUpRateThreshold || trueProcessingRate > scaleDownRateThreshold) {
            LOG.debug("Vertex {} processing rate {} is outside ({}, {})", new Object[]{vertex, trueProcessingRate, scaleUpRateThreshold, scaleDownRateThreshold});
            return true;
        }
        LOG.debug("Vertex {} processing rate {} is within target ({}, {})", new Object[]{vertex, trueProcessingRate, scaleUpRateThreshold, scaleDownRateThreshold});
        return false;
    }

    private ParallelismChange applyScaleDownInterval(DelayedScaleDown delayedScaleDown, JobVertexID vertex, Configuration conf, int newParallelism, boolean outsideUtilizationBound) {
        DelayedScaleDown.VertexDelayedScaleDownInfo delayedScaleDownInfo;
        Duration scaleDownInterval = (Duration)conf.get(AutoScalerOptions.SCALE_DOWN_INTERVAL);
        if (scaleDownInterval.toMillis() <= 0L) {
            return ParallelismChange.build(newParallelism, outsideUtilizationBound);
        }
        Instant now = this.clock.instant();
        Instant windowStartTime = now.minus(scaleDownInterval);
        if (windowStartTime.isBefore((delayedScaleDownInfo = delayedScaleDown.triggerScaleDown(vertex, now, newParallelism, outsideUtilizationBound)).getFirstTriggerTime())) {
            if (now.equals(delayedScaleDownInfo.getFirstTriggerTime())) {
                LOG.info("The scale down of {} is delayed by {}.", (Object)vertex, (Object)scaleDownInterval);
            } else {
                LOG.debug("Try to skip immediate scale down within scale-down interval for {}", (Object)vertex);
            }
            return ParallelismChange.noChange();
        }
        DelayedScaleDown.RecommendedParallelism maxRecommendedParallelism = delayedScaleDownInfo.getMaxRecommendedParallelism(windowStartTime);
        return ParallelismChange.build(maxRecommendedParallelism.getParallelism(), maxRecommendedParallelism.isOutsideUtilizationBound());
    }

    private boolean detectIneffectiveScaleUp(Context context, JobVertexID vertex, Configuration conf, Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics, ScalingSummary lastSummary) {
        double expectedIncrease;
        boolean withinEffectiveThreshold;
        double lastProcRate = lastSummary.getMetrics().get((Object)ScalingMetric.TRUE_PROCESSING_RATE).getAverage();
        double lastExpectedProcRate = lastSummary.getMetrics().get((Object)ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent();
        double currentProcRate = evaluatedMetrics.get((Object)ScalingMetric.TRUE_PROCESSING_RATE).getAverage();
        double actualIncrease = currentProcRate - lastProcRate;
        boolean bl = withinEffectiveThreshold = actualIncrease / (expectedIncrease = lastExpectedProcRate - lastProcRate) >= (Double)conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD);
        if (withinEffectiveThreshold) {
            return false;
        }
        boolean blockIneffectiveScalings = (Boolean)conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED);
        String message = String.format(INEFFECTIVE_MESSAGE_FORMAT, vertex, expectedIncrease, actualIncrease, blockIneffectiveScalings ? "enabled" : "disabled");
        this.autoScalerEventHandler.handleEvent(context, AutoScalerEventHandler.Type.Normal, INEFFECTIVE_SCALING, message, "ineffective" + vertex + expectedIncrease, (Duration)conf.get(AutoScalerOptions.SCALING_EVENT_INTERVAL));
        return blockIneffectiveScalings;
    }

    @VisibleForTesting
    protected static <KEY, Context extends JobAutoScalerContext<KEY>> int scale(JobVertexID vertex, int currentParallelism, Collection<ShipStrategy> inputShipStrategies, int numSourcePartitions, int maxParallelism, double scaleFactor, int parallelismLowerLimit, int parallelismUpperLimit, AutoScalerEventHandler<KEY, Context> eventHandler, Context context) {
        int p;
        boolean adjustByMaxParallelismOrPartitions;
        Preconditions.checkArgument((parallelismLowerLimit <= parallelismUpperLimit ? 1 : 0) != 0, (Object)"The parallelism lower limitation must not be greater than the parallelism upper limitation.");
        if (parallelismLowerLimit > maxParallelism) {
            LOG.warn("Specified autoscaler minimum parallelism {} is greater than the operator max parallelism {}. The min parallelism will be set to the operator max parallelism.", (Object)parallelismLowerLimit, (Object)maxParallelism);
        }
        if (maxParallelism < parallelismUpperLimit && parallelismUpperLimit != Integer.MAX_VALUE) {
            LOG.debug("Specified autoscaler maximum parallelism {} is greater than the operator max parallelism {}. This means the operator max parallelism can never be reached.", (Object)parallelismUpperLimit, (Object)maxParallelism);
        }
        int newParallelism = (int)Math.min(Math.ceil(scaleFactor * (double)currentParallelism), 2.147483647E9);
        int upperBound = Math.min(maxParallelism, parallelismUpperLimit);
        newParallelism = Math.min(Math.max(parallelismLowerLimit, newParallelism), upperBound);
        boolean bl = adjustByMaxParallelismOrPartitions = numSourcePartitions > 0 || inputShipStrategies.contains((Object)ShipStrategy.HASH);
        if (!adjustByMaxParallelismOrPartitions) {
            return newParallelism;
        }
        int numKeyGroupsOrPartitions = numSourcePartitions <= 0 ? maxParallelism : numSourcePartitions;
        int upperBoundForAlignment = Math.min(numKeyGroupsOrPartitions, upperBound);
        KeyGroupOrPartitionsAdjustMode mode = (KeyGroupOrPartitionsAdjustMode)((Object)context.getConfiguration().get(AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE));
        for (p = newParallelism; p <= upperBoundForAlignment; ++p) {
            if (numKeyGroupsOrPartitions % p != 0 && (mode != KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION || numKeyGroupsOrPartitions / p >= numKeyGroupsOrPartitions / newParallelism)) continue;
            return p;
        }
        for (p = newParallelism; p > 0; --p) {
            if (numKeyGroupsOrPartitions / p <= numKeyGroupsOrPartitions / newParallelism) continue;
            if (numKeyGroupsOrPartitions % p == 0) break;
            ++p;
            break;
        }
        p = Math.max(p, parallelismLowerLimit);
        String message = String.format(SCALE_LIMITED_MESSAGE_FORMAT, vertex, newParallelism, p, numKeyGroupsOrPartitions, upperBound, parallelismLowerLimit);
        eventHandler.handleEvent(context, AutoScalerEventHandler.Type.Warning, SCALING_LIMITED, message, SCALING_LIMITED + vertex + newParallelism, (Duration)context.getConfiguration().get(AutoScalerOptions.SCALING_EVENT_INTERVAL));
        return p;
    }

    @VisibleForTesting
    protected void setClock(Clock clock) {
        this.clock = (Clock)Preconditions.checkNotNull((Object)clock);
    }

    public static enum KeyGroupOrPartitionsAdjustMode implements DescribedEnum
    {
        EVENLY_SPREAD("This mode ensures that the parallelism adjustment attempts to evenly distribute data across subtasks. It is particularly effective for source vertices that are aware of partition counts or vertices after 'keyBy' operation. The goal is to have the number of key groups or partitions be divisible by the set parallelism, ensuring even data distribution and reducing data skew."),
        MAXIMIZE_UTILISATION("This model is to maximize resource utilization. In this mode, an attempt is made to set the parallelism that meets the current consumption rate requirements. It is not enforced that the number of key groups or partitions is divisible by the parallelism.");

        private final InlineElement description;

        private KeyGroupOrPartitionsAdjustMode(String description) {
            this.description = TextElement.text((String)description);
        }

        public InlineElement getDescription() {
            return this.description;
        }
    }

    public static class ParallelismChange {
        private static final ParallelismChange NO_CHANGE = new ParallelismChange(-1, false);
        private final int newParallelism;
        private final boolean outsideUtilizationBound;

        private ParallelismChange(int newParallelism, boolean outsideUtilizationBound) {
            this.newParallelism = newParallelism;
            this.outsideUtilizationBound = outsideUtilizationBound;
        }

        public boolean isNoChange() {
            return this == NO_CHANGE;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            ParallelismChange that = (ParallelismChange)o;
            return this.newParallelism == that.newParallelism && this.outsideUtilizationBound == that.outsideUtilizationBound;
        }

        public int hashCode() {
            return Objects.hash(this.newParallelism, this.outsideUtilizationBound);
        }

        public String toString() {
            return this.isNoChange() ? "NoParallelismChange" : "ParallelismChange{newParallelism=" + this.newParallelism + ", outsideUtilizationBound=" + this.outsideUtilizationBound + "}";
        }

        public static ParallelismChange build(int newParallelism, boolean outsideUtilizationBound) {
            Preconditions.checkArgument((newParallelism > 0 ? 1 : 0) != 0, (Object)"The parallelism should be greater than 0.");
            return new ParallelismChange(newParallelism, outsideUtilizationBound);
        }

        public static ParallelismChange noChange() {
            return NO_CHANGE;
        }

        public int getNewParallelism() {
            return this.newParallelism;
        }

        public boolean isOutsideUtilizationBound() {
            return this.outsideUtilizationBound;
        }
    }
}

