/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.reconciler.deployment;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.diff.DiffType;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationStatus;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.diff.DiffResult;
import org.apache.flink.kubernetes.operator.reconciler.diff.ReflectiveDiffBuilder;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractFlinkResourceReconciler<CR extends AbstractFlinkResource<SPEC, STATUS>, SPEC extends AbstractFlinkSpec, STATUS extends CommonStatus<SPEC>>
implements Reconciler<CR> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractFlinkResourceReconciler.class);
    protected final FlinkConfigManager configManager;
    protected final EventRecorder eventRecorder;
    protected final StatusRecorder<CR, STATUS> statusRecorder;
    protected final KubernetesClient kubernetesClient;
    public static final String MSG_SUSPENDED = "Suspending existing deployment.";
    public static final String MSG_SPEC_CHANGED = "%s change(s) detected (%s), starting reconciliation.";
    public static final String MSG_ROLLBACK = "Rolling back failed deployment.";
    public static final String MSG_SUBMIT = "Starting deployment";
    protected Clock clock = Clock.systemDefaultZone();

    public AbstractFlinkResourceReconciler(KubernetesClient kubernetesClient, FlinkConfigManager configManager, EventRecorder eventRecorder, StatusRecorder<CR, STATUS> statusRecorder) {
        this.kubernetesClient = kubernetesClient;
        this.configManager = configManager;
        this.eventRecorder = eventRecorder;
        this.statusRecorder = statusRecorder;
    }

    @Override
    public final void reconcile(CR cr, Context<?> ctx) throws Exception {
        AbstractFlinkSpec spec = (AbstractFlinkSpec)cr.getSpec();
        Configuration deployConfig = this.getDeployConfig(cr.getMetadata(), spec, ctx);
        CommonStatus status = (CommonStatus)cr.getStatus();
        ReconciliationStatus reconciliationStatus = ((CommonStatus)cr.getStatus()).getReconciliationStatus();
        if (!this.readyToReconcile(cr, ctx, deployConfig)) {
            LOG.info("Not ready for reconciliation yet...");
            return;
        }
        if (reconciliationStatus.isBeforeFirstDeployment()) {
            LOG.info("Deploying for the first time");
            ReconciliationUtils.updateStatusBeforeDeploymentAttempt(cr, deployConfig);
            this.statusRecorder.patchAndCacheStatus(cr);
            this.deploy(cr, spec, status, ctx, deployConfig, Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath), false);
            ReconciliationUtils.updateStatusForDeployedSpec(cr, deployConfig);
            return;
        }
        AbstractFlinkSpec lastReconciledSpec = ((CommonStatus)cr.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec();
        AbstractFlinkSpec currentDeploySpec = (AbstractFlinkSpec)cr.getSpec();
        Object specDiff = new ReflectiveDiffBuilder<AbstractFlinkSpec>(currentDeploySpec, lastReconciledSpec).build();
        FlinkService flinkService = this.getFlinkService(cr, ctx);
        boolean specChanged = DiffType.IGNORE != ((DiffResult)specDiff).getType() || reconciliationStatus.getState() == ReconciliationState.UPGRADING;
        Configuration observeConfig = this.getObserveConfig(cr, ctx);
        if (specChanged) {
            if (this.checkNewSpecAlreadyDeployed(cr, deployConfig)) {
                return;
            }
            String specChangeMessage = String.format(MSG_SPEC_CHANGED, ((DiffResult)specDiff).getType(), specDiff);
            LOG.info(specChangeMessage);
            if (reconciliationStatus.getState() != ReconciliationState.UPGRADING) {
                this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>)cr, EventRecorder.Type.Normal, EventRecorder.Reason.SpecChanged, EventRecorder.Component.JobManagerDeployment, specChangeMessage);
            }
            this.reconcileSpecChange(cr, ctx, observeConfig, deployConfig, ((DiffResult)specDiff).getType());
        } else if (this.shouldRollBack((AbstractFlinkResource<SPEC, STATUS>)cr, observeConfig, flinkService)) {
            if (this.initiateRollBack(status)) {
                return;
            }
            LOG.warn(MSG_ROLLBACK);
            this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>)cr, EventRecorder.Type.Normal, EventRecorder.Reason.Rollback, EventRecorder.Component.JobManagerDeployment, MSG_ROLLBACK);
            this.rollback(cr, ctx, observeConfig);
        } else if (!this.reconcileOtherChanges(cr, ctx, observeConfig)) {
            LOG.info("Resource fully reconciled, nothing to do...");
        }
    }

    protected abstract Configuration getDeployConfig(ObjectMeta var1, SPEC var2, Context<?> var3);

    protected abstract Configuration getObserveConfig(CR var1, Context<?> var2);

    protected abstract boolean readyToReconcile(CR var1, Context<?> var2, Configuration var3);

    protected abstract void reconcileSpecChange(CR var1, Context<?> var2, Configuration var3, Configuration var4, DiffType var5) throws Exception;

    protected abstract void rollback(CR var1, Context<?> var2, Configuration var3) throws Exception;

    protected abstract boolean reconcileOtherChanges(CR var1, Context<?> var2, Configuration var3) throws Exception;

    @Override
    public final DeleteControl cleanup(CR resource, Context<?> context) {
        return this.cleanupInternal(resource, context);
    }

    protected abstract void deploy(CR var1, SPEC var2, STATUS var3, Context<?> var4, Configuration var5, Optional<String> var6, boolean var7) throws Exception;

    protected abstract DeleteControl cleanupInternal(CR var1, Context<?> var2);

    protected abstract FlinkService getFlinkService(CR var1, Context<?> var2);

    private boolean checkNewSpecAlreadyDeployed(CR resource, Configuration deployConf) {
        if (((CommonStatus)resource.getStatus()).getReconciliationStatus().getState() == ReconciliationState.UPGRADING) {
            return false;
        }
        Object deployedSpec = ReconciliationUtils.getDeployedSpec(resource);
        if (((AbstractFlinkSpec)resource.getSpec()).equals(deployedSpec)) {
            LOG.info("The new spec matches the currently deployed last stable spec. No upgrade needed.");
            ReconciliationUtils.updateStatusForDeployedSpec(resource, deployConf);
            return true;
        }
        return false;
    }

    private boolean shouldRollBack(AbstractFlinkResource<SPEC, STATUS> resource, Configuration configuration, FlinkService flinkService) {
        ReconciliationStatus reconciliationStatus = ((CommonStatus)resource.getStatus()).getReconciliationStatus();
        if (reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
            return true;
        }
        if (!((Boolean)configuration.get(KubernetesOperatorConfigOptions.DEPLOYMENT_ROLLBACK_ENABLED)).booleanValue() || reconciliationStatus.getState() == ReconciliationState.ROLLED_BACK || reconciliationStatus.isLastReconciledSpecStable()) {
            return false;
        }
        AbstractFlinkSpec lastStableSpec = reconciliationStatus.deserializeLastStableSpec();
        if (lastStableSpec == null) {
            return false;
        }
        if (lastStableSpec.getJob() != null && lastStableSpec.getJob().getState() == JobState.SUSPENDED) {
            return false;
        }
        if (this.flinkVersionChanged((AbstractFlinkSpec)resource.getSpec(), lastStableSpec)) {
            return false;
        }
        Duration readinessTimeout = (Duration)configuration.get(KubernetesOperatorConfigOptions.DEPLOYMENT_READINESS_TIMEOUT);
        if (!this.clock.instant().minus(readinessTimeout).isAfter(Instant.ofEpochMilli(reconciliationStatus.getReconciliationTimestamp()))) {
            return false;
        }
        boolean haDataAvailable = flinkService.isHaMetadataAvailable(configuration);
        if (!haDataAvailable) {
            LOG.warn("Rollback is not possible due to missing HA metadata");
        }
        return haDataAvailable;
    }

    private boolean initiateRollBack(STATUS status) {
        ReconciliationStatus reconciliationStatus = status.getReconciliationStatus();
        if (reconciliationStatus.getState() != ReconciliationState.ROLLING_BACK) {
            LOG.warn("Preparing to roll back to last stable spec.");
            if (StringUtils.isEmpty((CharSequence)status.getError())) {
                status.setError("Deployment is not ready within the configured timeout, rolling back.");
            }
            reconciliationStatus.setState(ReconciliationState.ROLLING_BACK);
            return true;
        }
        return false;
    }

    protected boolean shouldRecoverDeployment(Configuration conf, FlinkDeployment deployment) {
        boolean result = false;
        if (((Boolean)conf.get(KubernetesOperatorConfigOptions.OPERATOR_JM_DEPLOYMENT_RECOVERY_ENABLED)).booleanValue()) {
            LOG.debug("Checking whether jobmanager deployment needs recovery");
            if (this.jmMissingForRunningDeployment(deployment)) {
                LOG.debug("Jobmanager deployment is missing, trying to recover");
                if (FlinkUtils.isKubernetesHAActivated(conf)) {
                    LOG.debug("HA is enabled, recovering lost jobmanager deployment");
                    result = true;
                } else {
                    LOG.warn("Could not recover lost jobmanager deployment without HA enabled");
                }
            }
        }
        return result;
    }

    private boolean jmMissingForRunningDeployment(FlinkDeployment deployment) {
        JobSpec deployedJob = ((FlinkDeploymentSpec)ReconciliationUtils.getDeployedSpec(deployment)).getJob();
        return (deployedJob == null || deployedJob.getState() == JobState.RUNNING) && ((FlinkDeploymentStatus)deployment.getStatus()).getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.MISSING;
    }

    protected boolean flinkVersionChanged(SPEC oldSpec, SPEC newSpec) {
        if (oldSpec instanceof FlinkDeploymentSpec) {
            return ((FlinkDeploymentSpec)oldSpec).getFlinkVersion() != ((FlinkDeploymentSpec)newSpec).getFlinkVersion();
        }
        return false;
    }

    protected void setOwnerReference(CR owner, Configuration deployConfig) {
        Map<String, String> ownerReference = Map.of("apiVersion", owner.getApiVersion(), "kind", owner.getKind(), "name", owner.getMetadata().getName(), "uid", owner.getMetadata().getUid(), "blockOwnerDeletion", "false", "controller", "false");
        deployConfig.set(KubernetesConfigOptions.JOB_MANAGER_OWNER_REFERENCE, List.of(ownerReference));
    }

    @VisibleForTesting
    protected void setClock(Clock clock) {
        this.clock = clock;
    }
}

