/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.autoscaler;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.autoscaler.DelayedScaleDown;
import org.apache.flink.autoscaler.JobAutoScaler;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.ScalingExecutor;
import org.apache.flink.autoscaler.ScalingMetricCollector;
import org.apache.flink.autoscaler.ScalingMetricEvaluator;
import org.apache.flink.autoscaler.ScalingSummary;
import org.apache.flink.autoscaler.ScalingTracking;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
import org.apache.flink.autoscaler.exceptions.NotReadyException;
import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
import org.apache.flink.autoscaler.metrics.CollectedMetricHistory;
import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
import org.apache.flink.autoscaler.metrics.ScalingHistoryUtils;
import org.apache.flink.autoscaler.realizer.ScalingRealizer;
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.tuning.ConfigChanges;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobAutoScalerImpl<KEY, Context extends JobAutoScalerContext<KEY>>
implements JobAutoScaler<KEY, Context> {
    private static final Logger LOG = LoggerFactory.getLogger(JobAutoScalerImpl.class);
    @VisibleForTesting
    protected static final String AUTOSCALER_ERROR = "AutoscalerError";
    private final ScalingMetricCollector<KEY, Context> metricsCollector;
    private final ScalingMetricEvaluator evaluator;
    private final ScalingExecutor<KEY, Context> scalingExecutor;
    private final AutoScalerEventHandler<KEY, Context> eventHandler;
    private final ScalingRealizer<KEY, Context> scalingRealizer;
    private final AutoScalerStateStore<KEY, Context> stateStore;
    private Clock clock = Clock.systemDefaultZone();
    @VisibleForTesting
    final Map<KEY, EvaluatedMetrics> lastEvaluatedMetrics = new ConcurrentHashMap<KEY, EvaluatedMetrics>();
    @VisibleForTesting
    final Map<KEY, AutoscalerFlinkMetrics> flinkMetrics = new ConcurrentHashMap<KEY, AutoscalerFlinkMetrics>();

    public JobAutoScalerImpl(ScalingMetricCollector<KEY, Context> metricsCollector, ScalingMetricEvaluator evaluator, ScalingExecutor<KEY, Context> scalingExecutor, AutoScalerEventHandler<KEY, Context> eventHandler, ScalingRealizer<KEY, Context> scalingRealizer, AutoScalerStateStore<KEY, Context> stateStore) {
        this.metricsCollector = metricsCollector;
        this.evaluator = evaluator;
        this.scalingExecutor = scalingExecutor;
        this.eventHandler = eventHandler;
        this.scalingRealizer = scalingRealizer;
        this.stateStore = stateStore;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void scale(Context ctx) throws Exception {
        AutoscalerFlinkMetrics autoscalerMetrics = this.getOrInitAutoscalerFlinkMetrics(ctx);
        try {
            if (!((JobAutoScalerContext)ctx).getConfiguration().getBoolean(AutoScalerOptions.AUTOSCALER_ENABLED)) {
                LOG.debug("Autoscaler is disabled");
                this.stateStore.clearAll(ctx);
                this.stateStore.flush(ctx);
                return;
            }
            if (((JobAutoScalerContext)ctx).getJobStatus() != JobStatus.RUNNING) {
                LOG.debug("Autoscaler is waiting for stable, running state");
                this.lastEvaluatedMetrics.remove(((JobAutoScalerContext)ctx).getJobKey());
                return;
            }
            this.runScalingLogic(ctx, autoscalerMetrics);
            this.stateStore.flush(ctx);
        }
        catch (NotReadyException e) {
            LOG.debug("Not ready for scaling", (Throwable)e);
        }
        catch (Throwable e) {
            this.onError(ctx, autoscalerMetrics, e);
        }
        finally {
            try {
                this.applyParallelismOverrides(ctx);
                this.applyConfigOverrides(ctx);
            }
            catch (Exception e) {
                LOG.error("Error applying overrides.", (Throwable)e);
                this.onError(ctx, autoscalerMetrics, e);
            }
        }
    }

    @Override
    public void cleanup(Context ctx) {
        LOG.info("Cleaning up autoscaling meta data");
        this.metricsCollector.cleanup(((JobAutoScalerContext)ctx).getJobKey());
        this.lastEvaluatedMetrics.remove(((JobAutoScalerContext)ctx).getJobKey());
        this.flinkMetrics.remove(((JobAutoScalerContext)ctx).getJobKey());
        try {
            this.stateStore.clearAll(ctx);
            this.stateStore.flush(ctx);
        }
        catch (Exception e) {
            LOG.error("Error cleaning up autoscaling meta data for {}", ((JobAutoScalerContext)ctx).getJobKey(), (Object)e);
        }
    }

    @VisibleForTesting
    protected Map<String, String> getParallelismOverrides(Context ctx) throws Exception {
        return this.stateStore.getParallelismOverrides(ctx);
    }

    @VisibleForTesting
    protected void applyParallelismOverrides(Context ctx) throws Exception {
        Map<String, String> overrides = this.getParallelismOverrides(ctx);
        if (overrides.isEmpty()) {
            return;
        }
        LOG.debug("Applying parallelism overrides: {}", overrides);
        Configuration conf = ((JobAutoScalerContext)ctx).getConfiguration();
        HashMap<String, String> userOverrides = new HashMap<String, String>((Map)conf.get(PipelineOptions.PARALLELISM_OVERRIDES));
        List exclusions = (List)conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
        overrides.forEach((k, v) -> {
            if (exclusions.contains(k)) {
                userOverrides.putIfAbsent((String)k, (String)v);
            } else {
                userOverrides.put((String)k, (String)v);
            }
        });
        this.scalingRealizer.realizeParallelismOverrides(ctx, userOverrides);
    }

    @VisibleForTesting
    void applyConfigOverrides(Context ctx) throws Exception {
        if (!((Boolean)((JobAutoScalerContext)ctx).getConfiguration().get(AutoScalerOptions.MEMORY_TUNING_ENABLED)).booleanValue()) {
            return;
        }
        ConfigChanges configChanges = this.stateStore.getConfigChanges(ctx);
        LOG.debug("Applying config overrides: {}", (Object)configChanges);
        this.scalingRealizer.realizeConfigOverrides(ctx, configChanges);
    }

    private void runScalingLogic(Context ctx, AutoscalerFlinkMetrics autoscalerMetrics) throws Exception {
        CollectedMetricHistory collectedMetrics = this.metricsCollector.updateMetrics(ctx, this.stateStore);
        JobTopology jobTopology = collectedMetrics.getJobTopology();
        Instant now = this.clock.instant();
        ScalingTracking scalingTracking = ScalingHistoryUtils.getTrimmedScalingTracking(this.stateStore, ctx, now);
        Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory = ScalingHistoryUtils.getTrimmedScalingHistory(this.stateStore, ctx, now);
        if (scalingTracking.recordRestartDurationIfTrackedAndParallelismMatches(collectedMetrics.getJobRunningTs(), jobTopology, scalingHistory)) {
            this.stateStore.storeScalingTracking(ctx, scalingTracking);
        }
        if (collectedMetrics.getMetricHistory().size() < 2) {
            return;
        }
        LOG.debug("Collected metrics: {}", (Object)collectedMetrics);
        Duration restartTime = scalingTracking.getMaxRestartTimeOrDefault(((JobAutoScalerContext)ctx).getConfiguration());
        EvaluatedMetrics evaluatedMetrics = this.evaluator.evaluate(((JobAutoScalerContext)ctx).getConfiguration(), collectedMetrics, restartTime);
        LOG.debug("Evaluated metrics: {}", (Object)evaluatedMetrics);
        this.lastEvaluatedMetrics.put(((JobAutoScalerContext)ctx).getJobKey(), evaluatedMetrics);
        AutoscalerFlinkMetrics.initRecommendedParallelism(evaluatedMetrics.getVertexMetrics());
        autoscalerMetrics.registerScalingMetrics(jobTopology.getVerticesInTopologicalOrder(), () -> this.lastEvaluatedMetrics.get(ctx.getJobKey()));
        if (!collectedMetrics.isFullyCollected()) {
            AutoscalerFlinkMetrics.resetRecommendedParallelism(evaluatedMetrics.getVertexMetrics());
            return;
        }
        DelayedScaleDown delayedScaleDown = this.stateStore.getDelayedScaleDown(ctx);
        boolean parallelismChanged = this.scalingExecutor.scaleResource(ctx, evaluatedMetrics, scalingHistory, scalingTracking, now, jobTopology, delayedScaleDown);
        if (delayedScaleDown.isUpdated()) {
            this.stateStore.storeDelayedScaleDown(ctx, delayedScaleDown);
        }
        if (parallelismChanged) {
            autoscalerMetrics.incrementScaling();
        } else {
            autoscalerMetrics.incrementBalanced();
        }
    }

    private void onError(Context ctx, AutoscalerFlinkMetrics autoscalerMetrics, Throwable e) {
        LOG.error("Error while scaling job", e);
        autoscalerMetrics.incrementError();
        this.eventHandler.handleException(ctx, AUTOSCALER_ERROR, e);
    }

    private AutoscalerFlinkMetrics getOrInitAutoscalerFlinkMetrics(Context ctx) {
        return this.flinkMetrics.computeIfAbsent(((JobAutoScalerContext)ctx).getJobKey(), id -> new AutoscalerFlinkMetrics(ctx.getMetricGroup().addGroup("AutoScaler")));
    }

    @VisibleForTesting
    void setClock(Clock clock) {
        this.clock = (Clock)Preconditions.checkNotNull((Object)clock);
        this.metricsCollector.setClock(clock);
        this.scalingExecutor.setClock(clock);
    }
}

