/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.reconciler.deployment;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.autoscaler.JobAutoScaler;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.diff.DiffType;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationStatus;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.diff.DiffResult;
import org.apache.flink.kubernetes.operator.reconciler.diff.ReflectiveDiffBuilder;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractFlinkResourceReconciler<CR extends AbstractFlinkResource<SPEC, STATUS>, SPEC extends AbstractFlinkSpec, STATUS extends CommonStatus<SPEC>>
implements Reconciler<CR> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractFlinkResourceReconciler.class);
    protected final EventRecorder eventRecorder;
    protected final StatusRecorder<CR, STATUS> statusRecorder;
    protected final JobAutoScaler<ResourceID, KubernetesJobAutoScalerContext> autoscaler;
    public static final String MSG_SUSPENDED = "Suspending existing deployment.";
    public static final String MSG_SPEC_CHANGED = "%s change(s) detected (%s), starting reconciliation.";
    public static final String MSG_ROLLBACK = "Rolling back failed deployment.";
    public static final String MSG_SUBMIT = "Starting deployment";
    protected Clock clock = Clock.systemDefaultZone();

    public AbstractFlinkResourceReconciler(EventRecorder eventRecorder, StatusRecorder<CR, STATUS> statusRecorder, JobAutoScaler<ResourceID, KubernetesJobAutoScalerContext> autoscaler) {
        this.eventRecorder = eventRecorder;
        this.statusRecorder = statusRecorder;
        this.autoscaler = autoscaler;
    }

    @Override
    public void reconcile(FlinkResourceContext<CR> ctx) throws Exception {
        boolean specChanged;
        CR cr = ctx.getResource();
        CommonStatus status = (CommonStatus)cr.getStatus();
        ReconciliationStatus reconciliationStatus = ((CommonStatus)cr.getStatus()).getReconciliationStatus();
        if (!this.readyToReconcile(ctx)) {
            LOG.info("Not ready for reconciliation yet...");
            return;
        }
        if (reconciliationStatus.isBeforeFirstDeployment()) {
            AbstractFlinkSpec spec = (AbstractFlinkSpec)cr.getSpec();
            if (spec.getJob() != null && spec.getJob().getState().equals((Object)JobState.SUSPENDED)) {
                return;
            }
            LOG.info("Deploying for the first time");
            Configuration deployConfig = ctx.getDeployConfig(spec);
            this.updateStatusBeforeFirstDeployment(cr, spec, deployConfig, status, ctx.getKubernetesClient());
            this.deploy(ctx, spec, deployConfig, Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath), false);
            ReconciliationUtils.updateStatusForDeployedSpec(cr, deployConfig, this.clock);
            return;
        }
        AbstractFlinkSpec lastReconciledSpec = ((CommonStatus)cr.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec();
        AbstractFlinkSpec currentDeploySpec = (AbstractFlinkSpec)cr.getSpec();
        this.applyAutoscaler(ctx);
        ReconciliationState reconciliationState = reconciliationStatus.getState();
        Object specDiff = new ReflectiveDiffBuilder<AbstractFlinkSpec>(ctx.getDeploymentMode(), lastReconciledSpec, currentDeploySpec).build();
        DiffType diffType = ((DiffResult)specDiff).getType();
        boolean bl = specChanged = DiffType.IGNORE != diffType || reconciliationState == ReconciliationState.UPGRADING;
        if (this.shouldRollBack(ctx, specChanged, lastReconciledSpec)) {
            this.prepareCrForRollback(ctx, specChanged, lastReconciledSpec);
            specChanged = true;
            diffType = DiffType.UPGRADE;
        }
        if (specChanged) {
            boolean scaled;
            Configuration deployConfig = ctx.getDeployConfig((AbstractFlinkSpec)cr.getSpec());
            if (this.checkNewSpecAlreadyDeployed(cr, deployConfig)) {
                return;
            }
            this.triggerSpecChangeEvent(cr, (DiffResult<SPEC>)specDiff, ctx.getKubernetesClient());
            boolean bl2 = scaled = diffType != DiffType.UPGRADE && this.scale(ctx, deployConfig);
            if (scaled || this.reconcileSpecChange(ctx, deployConfig, lastReconciledSpec)) {
                return;
            }
        } else {
            ReconciliationUtils.updateReconciliationMetadata(cr);
        }
        if (!this.reconcileOtherChanges(ctx)) {
            LOG.info("Resource fully reconciled, nothing to do...");
        }
    }

    private void applyAutoscaler(FlinkResourceContext<CR> ctx) throws Exception {
        KubernetesJobAutoScalerContext autoScalerCtx = ctx.getJobAutoScalerContext();
        boolean autoscalerEnabled = ((AbstractFlinkSpec)ctx.getResource().getSpec()).getJob() != null && ctx.getObserveConfig().getBoolean(AutoScalerOptions.AUTOSCALER_ENABLED);
        autoScalerCtx.getConfiguration().set(AutoScalerOptions.AUTOSCALER_ENABLED, (Object)autoscalerEnabled);
        this.autoscaler.scale((JobAutoScalerContext)autoScalerCtx);
    }

    private void triggerSpecChangeEvent(CR cr, DiffResult<SPEC> specDiff, KubernetesClient client) {
        this.eventRecorder.triggerEventOnce((AbstractFlinkResource<?, ?>)cr, EventRecorder.Type.Normal, EventRecorder.Reason.SpecChanged, String.format(MSG_SPEC_CHANGED, specDiff.getType(), specDiff), EventRecorder.Component.JobManagerDeployment, "SpecChange: " + cr.getMetadata().getGeneration(), client);
    }

    private void updateStatusBeforeFirstDeployment(CR cr, SPEC spec, Configuration deployConfig, STATUS status, KubernetesClient client) {
        if (spec.getJob() != null) {
            UpgradeMode initialUpgradeMode = UpgradeMode.STATELESS;
            String initialSp = spec.getJob().getInitialSavepointPath();
            if (initialSp != null) {
                status.getJobStatus().getSavepointInfo().setLastSavepoint(Savepoint.of((String)initialSp, (SnapshotTriggerType)SnapshotTriggerType.UNKNOWN));
                initialUpgradeMode = UpgradeMode.SAVEPOINT;
            }
            spec.getJob().setUpgradeMode(initialUpgradeMode);
        }
        ReconciliationUtils.updateStatusBeforeDeploymentAttempt(cr, deployConfig, this.clock);
        this.statusRecorder.patchAndCacheStatus(cr, client);
    }

    protected abstract boolean readyToReconcile(FlinkResourceContext<CR> var1);

    protected abstract boolean reconcileSpecChange(FlinkResourceContext<CR> var1, Configuration var2, SPEC var3) throws Exception;

    protected abstract boolean reconcileOtherChanges(FlinkResourceContext<CR> var1) throws Exception;

    @Override
    public DeleteControl cleanup(FlinkResourceContext<CR> ctx) {
        this.autoscaler.cleanup((Object)ResourceID.fromResource(ctx.getResource()));
        return this.cleanupInternal(ctx);
    }

    @VisibleForTesting
    public abstract void deploy(FlinkResourceContext<CR> var1, SPEC var2, Configuration var3, Optional<String> var4, boolean var5) throws Exception;

    protected abstract DeleteControl cleanupInternal(FlinkResourceContext<CR> var1);

    private boolean checkNewSpecAlreadyDeployed(CR resource, Configuration deployConf) {
        if (((CommonStatus)resource.getStatus()).getReconciliationStatus().getState() == ReconciliationState.UPGRADING || ((CommonStatus)resource.getStatus()).getReconciliationStatus().getState() == ReconciliationState.ROLLING_BACK) {
            return false;
        }
        Object deployedSpec = ReconciliationUtils.getDeployedSpec(resource);
        if (((AbstractFlinkSpec)resource.getSpec()).equals(deployedSpec)) {
            LOG.info("The new spec matches the currently deployed last stable spec. No upgrade needed.");
            ReconciliationUtils.updateStatusForDeployedSpec(resource, deployConf, this.clock);
            return true;
        }
        return false;
    }

    private boolean scale(FlinkResourceContext<CR> ctx, Configuration deployConfig) throws Exception {
        FlinkService.ScalingResult scalingResult = ctx.getFlinkService().scale(ctx, deployConfig);
        if (scalingResult == FlinkService.ScalingResult.CANNOT_SCALE) {
            return false;
        }
        ReconciliationUtils.updateAfterScaleUp(ctx.getResource(), deployConfig, this.clock, scalingResult);
        return true;
    }

    private boolean shouldRollBack(FlinkResourceContext<CR> ctx, boolean specChanged, SPEC lastReconciledSpec) {
        CR resource = ctx.getResource();
        ReconciliationStatus reconciliationStatus = ((CommonStatus)resource.getStatus()).getReconciliationStatus();
        Configuration configuration = ctx.getObserveConfig();
        if (reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
            return true;
        }
        if (specChanged) {
            return false;
        }
        if (!((Boolean)configuration.get(KubernetesOperatorConfigOptions.DEPLOYMENT_ROLLBACK_ENABLED)).booleanValue() || reconciliationStatus.getState() == ReconciliationState.ROLLED_BACK || reconciliationStatus.isLastReconciledSpecStable()) {
            return false;
        }
        AbstractFlinkSpec lastStableSpec = reconciliationStatus.deserializeLastStableSpec();
        if (lastStableSpec == null) {
            return false;
        }
        if (lastStableSpec.getJob() != null && lastStableSpec.getJob().getState() == JobState.SUSPENDED) {
            return false;
        }
        if (this.flinkVersionChanged((AbstractFlinkSpec)resource.getSpec(), lastStableSpec)) {
            return false;
        }
        Duration readinessTimeout = (Duration)configuration.get(KubernetesOperatorConfigOptions.DEPLOYMENT_READINESS_TIMEOUT);
        if (!this.clock.instant().minus(readinessTimeout).isAfter(Instant.ofEpochMilli(reconciliationStatus.getReconciliationTimestamp()))) {
            return false;
        }
        if (lastReconciledSpec.getJob() != null && lastReconciledSpec.getJob().getUpgradeMode() == UpgradeMode.SAVEPOINT && FlinkUtils.jmPodNeverStarted(ctx.getJosdkContext())) {
            return true;
        }
        boolean haDataAvailable = ctx.getFlinkService().isHaMetadataAvailable(configuration);
        if (!haDataAvailable) {
            LOG.warn("Rollback is not possible due to missing HA metadata");
        }
        return haDataAvailable;
    }

    private void prepareCrForRollback(FlinkResourceContext<CR> ctx, boolean specChanged, SPEC lastReconciledSpec) {
        CR cr = ctx.getResource();
        CommonStatus status = (CommonStatus)cr.getStatus();
        ReconciliationStatus reconciliationStatus = status.getReconciliationStatus();
        if (reconciliationStatus.getState() != ReconciliationState.ROLLING_BACK) {
            reconciliationStatus.setState(ReconciliationState.ROLLING_BACK);
            LOG.warn(MSG_ROLLBACK);
            this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>)ctx.getResource(), EventRecorder.Type.Normal, EventRecorder.Reason.Rollback, EventRecorder.Component.JobManagerDeployment, MSG_ROLLBACK, ctx.getKubernetesClient());
        } else if (lastReconciledSpec.getJob() != null) {
            lastReconciledSpec.getJob().setState(JobState.SUSPENDED);
        }
        if (specChanged) {
            reconciliationStatus.setState(ReconciliationState.UPGRADING);
        } else {
            cr.setSpec((Object)reconciliationStatus.deserializeLastStableSpec());
            JobSpec job = ((AbstractFlinkSpec)cr.getSpec()).getJob();
            if (job != null) {
                job.setUpgradeMode(lastReconciledSpec.getJob().getUpgradeMode() == UpgradeMode.STATELESS ? UpgradeMode.STATELESS : UpgradeMode.LAST_STATE);
            }
        }
    }

    protected boolean shouldRecoverDeployment(Configuration conf, FlinkDeployment deployment) {
        boolean result = false;
        if (((Boolean)conf.get(KubernetesOperatorConfigOptions.OPERATOR_JM_DEPLOYMENT_RECOVERY_ENABLED)).booleanValue()) {
            LOG.debug("Checking whether jobmanager deployment needs recovery");
            if (this.jmMissingForRunningDeployment(deployment)) {
                boolean stateless;
                LOG.debug("Jobmanager deployment is missing, trying to recover");
                JobSpec jobSpec = ((FlinkDeploymentSpec)deployment.getSpec()).getJob();
                boolean bl = stateless = jobSpec != null && jobSpec.getUpgradeMode() == UpgradeMode.STATELESS;
                if (stateless || HighAvailabilityMode.isHighAvailabilityModeActivated((Configuration)conf)) {
                    LOG.debug("HA is enabled, recovering lost jobmanager deployment");
                    result = true;
                } else {
                    LOG.warn("Could not recover lost jobmanager deployment without HA enabled");
                }
            }
        }
        return result;
    }

    private boolean jmMissingForRunningDeployment(FlinkDeployment deployment) {
        JobSpec deployedJob = ((FlinkDeploymentSpec)ReconciliationUtils.getDeployedSpec(deployment)).getJob();
        return (deployedJob == null || deployedJob.getState() == JobState.RUNNING) && ((FlinkDeploymentStatus)deployment.getStatus()).getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.MISSING;
    }

    protected boolean flinkVersionChanged(SPEC oldSpec, SPEC newSpec) {
        if (oldSpec instanceof FlinkDeploymentSpec) {
            return ((FlinkDeploymentSpec)oldSpec).getFlinkVersion() != ((FlinkDeploymentSpec)newSpec).getFlinkVersion();
        }
        return false;
    }

    protected void setOwnerReference(CR owner, Configuration deployConfig) {
        Map<String, String> ownerReference = Map.of("apiVersion", owner.getApiVersion(), "kind", owner.getKind(), "name", owner.getMetadata().getName(), "uid", owner.getMetadata().getUid(), "blockOwnerDeletion", "true", "controller", "false");
        deployConfig.set(KubernetesConfigOptions.JOB_MANAGER_OWNER_REFERENCE, List.of(ownerReference));
    }

    @VisibleForTesting
    public void setClock(Clock clock) {
        this.clock = clock;
    }
}

