/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.reconciler.deployment;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import java.util.Optional;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.diff.DiffType;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationStatus;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractJobReconciler<CR extends AbstractFlinkResource<SPEC, STATUS>, SPEC extends AbstractFlinkSpec, STATUS extends CommonStatus<SPEC>>
extends AbstractFlinkResourceReconciler<CR, SPEC, STATUS> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractJobReconciler.class);

    public AbstractJobReconciler(KubernetesClient kubernetesClient, FlinkConfigManager configManager, EventRecorder eventRecorder, StatusRecorder<CR, STATUS> statusRecorder) {
        super(kubernetesClient, configManager, eventRecorder, statusRecorder);
    }

    @Override
    public boolean readyToReconcile(CR resource, Context<?> context, Configuration deployConfig) {
        if (this.shouldWaitForPendingSavepoint(((CommonStatus)resource.getStatus()).getJobStatus(), this.getDeployConfig(resource.getMetadata(), (AbstractFlinkSpec)resource.getSpec(), context))) {
            LOG.info("Delaying job reconciliation until pending savepoint is completed.");
            return false;
        }
        return true;
    }

    private boolean shouldWaitForPendingSavepoint(org.apache.flink.kubernetes.operator.api.status.JobStatus jobStatus, Configuration conf) {
        return !conf.getBoolean(KubernetesOperatorConfigOptions.JOB_UPGRADE_IGNORE_PENDING_SAVEPOINT) && SavepointUtils.savepointInProgress(jobStatus);
    }

    @Override
    protected void reconcileSpecChange(CR resource, Context<?> ctx, Configuration observeConfig, Configuration deployConfig, DiffType diffType) throws Exception {
        boolean scaled;
        CommonStatus status = (CommonStatus)resource.getStatus();
        ReconciliationStatus reconciliationStatus = status.getReconciliationStatus();
        AbstractFlinkSpec lastReconciledSpec = reconciliationStatus.deserializeLastReconciledSpec();
        AbstractFlinkSpec currentDeploySpec = (AbstractFlinkSpec)resource.getSpec();
        if (diffType == DiffType.SCALE && (scaled = this.getFlinkService(resource, ctx).scale(resource.getMetadata(), ((AbstractFlinkSpec)resource.getSpec()).getJob(), deployConfig))) {
            LOG.info("Reactive scaling succeeded");
            ReconciliationUtils.updateStatusForDeployedSpec(resource, deployConfig);
            return;
        }
        JobState currentJobState = lastReconciledSpec.getJob().getState();
        JobState desiredJobState = currentDeploySpec.getJob().getState();
        if (currentJobState == JobState.RUNNING) {
            Optional<UpgradeMode> availableUpgradeMode;
            if (desiredJobState == JobState.RUNNING) {
                LOG.info("Upgrading/Restarting running job, suspending first...");
            }
            if ((availableUpgradeMode = this.getAvailableUpgradeMode(resource, deployConfig, observeConfig)).isEmpty()) {
                return;
            }
            this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>)resource, EventRecorder.Type.Normal, EventRecorder.Reason.Suspended, EventRecorder.Component.JobManagerDeployment, "Suspending existing deployment.");
            currentDeploySpec.getJob().setUpgradeMode(availableUpgradeMode.get());
            this.cancelJob(resource, ctx, availableUpgradeMode.get(), observeConfig);
            if (desiredJobState == JobState.RUNNING) {
                ReconciliationUtils.updateStatusBeforeDeploymentAttempt(resource, deployConfig);
            } else {
                ReconciliationUtils.updateStatusForDeployedSpec(resource, deployConfig);
            }
        }
        if (currentJobState == JobState.SUSPENDED && desiredJobState == JobState.RUNNING) {
            ReconciliationUtils.updateStatusBeforeDeploymentAttempt(resource, deployConfig);
            this.statusRecorder.patchAndCacheStatus(resource);
            this.restoreJob(resource, currentDeploySpec, status, ctx, deployConfig, lastReconciledSpec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE);
            ReconciliationUtils.updateStatusForDeployedSpec(resource, deployConfig);
        }
    }

    protected Optional<UpgradeMode> getAvailableUpgradeMode(CR resource, Configuration deployConfig, Configuration observeConfig) {
        CommonStatus status = (CommonStatus)resource.getStatus();
        UpgradeMode upgradeMode = ((AbstractFlinkSpec)resource.getSpec()).getJob().getUpgradeMode();
        if (upgradeMode == UpgradeMode.STATELESS) {
            LOG.info("Stateless job, ready for upgrade");
            return Optional.of(UpgradeMode.STATELESS);
        }
        if (ReconciliationUtils.isJobInTerminalState(status)) {
            LOG.info("Job is in terminal state, ready for upgrade from observed latest checkpoint/savepoint");
            return Optional.of(UpgradeMode.SAVEPOINT);
        }
        if (ReconciliationUtils.isJobRunning(status)) {
            LOG.info("Job is in running state, ready for upgrade with {}", (Object)upgradeMode);
            boolean changedToLastStateWithoutHa = ReconciliationUtils.isUpgradeModeChangedToLastStateAndHADisabledPreviously(resource, observeConfig);
            if (changedToLastStateWithoutHa) {
                LOG.info("Using savepoint upgrade mode when switching to last-state without HA previously enabled");
                return Optional.of(UpgradeMode.SAVEPOINT);
            }
            if (this.flinkVersionChanged(ReconciliationUtils.getDeployedSpec(resource), (AbstractFlinkSpec)resource.getSpec())) {
                LOG.info("Using savepoint upgrade mode when upgrading Flink version");
                return Optional.of(UpgradeMode.SAVEPOINT);
            }
            return Optional.of(upgradeMode);
        }
        return Optional.empty();
    }

    protected void restoreJob(CR resource, SPEC spec, STATUS status, Context<?> ctx, Configuration deployConfig, boolean requireHaMetadata) throws Exception {
        Optional<String> savepointOpt = Optional.empty();
        if (spec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) {
            savepointOpt = Optional.ofNullable(status.getJobStatus().getSavepointInfo().getLastSavepoint()).flatMap(s -> Optional.ofNullable(s.getLocation()));
        }
        this.deploy(resource, spec, status, ctx, deployConfig, savepointOpt, requireHaMetadata);
    }

    @Override
    protected void rollback(CR resource, Context<?> ctx, Configuration observeConfig) throws Exception {
        ReconciliationStatus reconciliationStatus = ((CommonStatus)resource.getStatus()).getReconciliationStatus();
        AbstractFlinkSpec rollbackSpec = reconciliationStatus.deserializeLastStableSpec();
        UpgradeMode upgradeMode = ((AbstractFlinkSpec)resource.getSpec()).getJob().getUpgradeMode();
        this.cancelJob(resource, ctx, upgradeMode == UpgradeMode.STATELESS ? UpgradeMode.STATELESS : UpgradeMode.LAST_STATE, observeConfig);
        this.restoreJob(resource, rollbackSpec, (CommonStatus)resource.getStatus(), ctx, this.getDeployConfig(resource.getMetadata(), rollbackSpec, ctx), upgradeMode != UpgradeMode.STATELESS);
        reconciliationStatus.setState(ReconciliationState.ROLLED_BACK);
    }

    @Override
    public boolean reconcileOtherChanges(CR resource, Context<?> context, Configuration observeConfig) throws Exception {
        JobStatus jobStatus = JobStatus.valueOf((String)((CommonStatus)resource.getStatus()).getJobStatus().getState());
        if (jobStatus == JobStatus.FAILED && observeConfig.getBoolean(KubernetesOperatorConfigOptions.OPERATOR_JOB_RESTART_FAILED)) {
            LOG.info("Stopping failed Flink job...");
            this.cleanupAfterFailedJob(resource, context, observeConfig);
            ((CommonStatus)resource.getStatus()).setError(null);
            this.resubmitJob(resource, context, observeConfig, false);
            return true;
        }
        return SavepointUtils.triggerSavepointIfNeeded(this.getFlinkService(resource, context), resource, observeConfig);
    }

    protected void resubmitJob(CR deployment, Context<?> ctx, Configuration observeConfig, boolean requireHaMetadata) throws Exception {
        LOG.info("Resubmitting Flink job...");
        Object specToRecover = ReconciliationUtils.getDeployedSpec(deployment);
        this.restoreJob(deployment, specToRecover, (CommonStatus)deployment.getStatus(), ctx, observeConfig, requireHaMetadata);
    }

    protected abstract void cancelJob(CR var1, Context<?> var2, UpgradeMode var3, Configuration var4) throws Exception;

    protected abstract void cleanupAfterFailedJob(CR var1, Context<?> var2, Configuration var3) throws Exception;
}

