/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.reconciler.deployment;

import io.javaoperatorsdk.operator.processing.event.ResourceID;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.function.Predicate;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.autoscaler.JobAutoScaler;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.diff.DiffType;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.SnapshotType;
import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler;
import org.apache.flink.kubernetes.operator.service.CheckpointHistoryWrapper;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.SnapshotUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractJobReconciler<CR extends AbstractFlinkResource<SPEC, STATUS>, SPEC extends AbstractFlinkSpec, STATUS extends CommonStatus<SPEC>>
extends AbstractFlinkResourceReconciler<CR, SPEC, STATUS> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractJobReconciler.class);

    public AbstractJobReconciler(EventRecorder eventRecorder, StatusRecorder<CR, STATUS> statusRecorder, JobAutoScaler<ResourceID, KubernetesJobAutoScalerContext> autoscaler) {
        super(eventRecorder, statusRecorder, autoscaler);
    }

    @Override
    public boolean readyToReconcile(FlinkResourceContext<CR> ctx) {
        CommonStatus status = (CommonStatus)ctx.getResource().getStatus();
        if (status.getReconciliationStatus().isBeforeFirstDeployment()) {
            return true;
        }
        if (this.shouldWaitForPendingSavepoint(status.getJobStatus(), ctx.getObserveConfig())) {
            LOG.info("Delaying job reconciliation until pending savepoint is completed.");
            return false;
        }
        return true;
    }

    private boolean shouldWaitForPendingSavepoint(org.apache.flink.kubernetes.operator.api.status.JobStatus jobStatus, Configuration conf) {
        return !conf.getBoolean(KubernetesOperatorConfigOptions.JOB_UPGRADE_IGNORE_PENDING_SAVEPOINT) && SnapshotUtils.savepointInProgress(jobStatus);
    }

    @Override
    protected boolean reconcileSpecChange(DiffType diffType, FlinkResourceContext<CR> ctx, Configuration deployConfig, SPEC lastReconciledSpec) throws Exception {
        CR resource = ctx.getResource();
        CommonStatus status = (CommonStatus)resource.getStatus();
        AbstractFlinkSpec currentDeploySpec = (AbstractFlinkSpec)resource.getSpec();
        JobState currentJobState = lastReconciledSpec.getJob().getState();
        JobState desiredJobState = currentDeploySpec.getJob().getState();
        if (diffType == DiffType.SAVEPOINT_REDEPLOY) {
            this.redeployWithSavepoint(ctx, deployConfig, resource, status, currentDeploySpec, desiredJobState);
            return true;
        }
        if (currentJobState == JobState.RUNNING) {
            AvailableUpgradeMode availableUpgradeMode;
            if (desiredJobState == JobState.RUNNING) {
                LOG.info("Upgrading/Restarting running job, suspending first...");
            }
            if (!(availableUpgradeMode = this.getAvailableUpgradeMode(ctx, deployConfig)).isAvailable()) {
                return false;
            }
            this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>)resource, EventRecorder.Type.Normal, EventRecorder.Reason.Suspended, EventRecorder.Component.JobManagerDeployment, "Suspending existing deployment.", ctx.getKubernetesClient());
            UpgradeMode upgradeMode = availableUpgradeMode.getUpgradeMode().get();
            currentDeploySpec.getJob().setUpgradeMode(upgradeMode);
            this.cancelJob(ctx, upgradeMode);
            if (desiredJobState == JobState.RUNNING) {
                ReconciliationUtils.updateStatusBeforeDeploymentAttempt(resource, deployConfig, this.clock);
            } else {
                ReconciliationUtils.updateStatusForDeployedSpec(resource, deployConfig, this.clock);
            }
        }
        if (currentJobState == JobState.SUSPENDED && desiredJobState == JobState.RUNNING) {
            if (currentDeploySpec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) {
                currentDeploySpec.getJob().setUpgradeMode(lastReconciledSpec.getJob().getUpgradeMode());
            }
            ReconciliationUtils.updateStatusBeforeDeploymentAttempt(resource, deployConfig, this.clock);
            this.statusRecorder.patchAndCacheStatus(resource, ctx.getKubernetesClient());
            this.restoreJob(ctx, currentDeploySpec, deployConfig, lastReconciledSpec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE);
            ReconciliationUtils.updateStatusForDeployedSpec(resource, deployConfig, this.clock);
        }
        return true;
    }

    protected AvailableUpgradeMode getAvailableUpgradeMode(FlinkResourceContext<CR> ctx, Configuration deployConfig) throws Exception {
        CR resource = ctx.getResource();
        CommonStatus status = (CommonStatus)resource.getStatus();
        UpgradeMode upgradeMode = ((AbstractFlinkSpec)resource.getSpec()).getJob().getUpgradeMode();
        if (upgradeMode == UpgradeMode.STATELESS) {
            LOG.info("Stateless job, ready for upgrade");
            return AvailableUpgradeMode.of(UpgradeMode.STATELESS);
        }
        FlinkService flinkService = ctx.getFlinkService();
        if (ReconciliationUtils.isJobInTerminalState(status) && !flinkService.isHaMetadataAvailable(ctx.getObserveConfig())) {
            LOG.info("Job is in terminal state, ready for upgrade from observed latest checkpoint/savepoint");
            return AvailableUpgradeMode.of(UpgradeMode.SAVEPOINT);
        }
        if (ReconciliationUtils.isJobRunning(status)) {
            LOG.info("Job is in running state, ready for upgrade with {}", (Object)upgradeMode);
            boolean changedToLastStateWithoutHa = ReconciliationUtils.isUpgradeModeChangedToLastStateAndHADisabledPreviously(resource, ctx.getObserveConfig());
            if (changedToLastStateWithoutHa) {
                LOG.info("Using savepoint upgrade mode when switching to last-state without HA previously enabled");
                return AvailableUpgradeMode.of(UpgradeMode.SAVEPOINT);
            }
            if (this.flinkVersionChanged(ReconciliationUtils.getDeployedSpec(resource), (AbstractFlinkSpec)resource.getSpec())) {
                LOG.info("Using savepoint upgrade mode when upgrading Flink version");
                return AvailableUpgradeMode.of(UpgradeMode.SAVEPOINT);
            }
            if (upgradeMode == UpgradeMode.LAST_STATE) {
                return this.changeLastStateIfCheckpointTooOld(ctx, deployConfig);
            }
            return AvailableUpgradeMode.of(UpgradeMode.SAVEPOINT);
        }
        return AvailableUpgradeMode.unavailable();
    }

    @VisibleForTesting
    protected AvailableUpgradeMode changeLastStateIfCheckpointTooOld(FlinkResourceContext<CR> ctx, Configuration deployConfig) throws Exception {
        Duration maxAge = (Duration)deployConfig.get(KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_CHECKPOINT_MAX_AGE);
        if (maxAge == null) {
            return AvailableUpgradeMode.of(UpgradeMode.LAST_STATE);
        }
        org.apache.flink.kubernetes.operator.api.status.JobStatus jobStatus = ((CommonStatus)ctx.getResource().getStatus()).getJobStatus();
        JobID jobId = JobID.fromHexString((String)jobStatus.getJobId());
        Instant startTime = Instant.ofEpochMilli(Long.parseLong(jobStatus.getStartTime()));
        Instant now = this.clock.instant();
        Predicate<Instant> withinMaxAge = ts -> now.minus(maxAge).isBefore((Instant)ts);
        if (withinMaxAge.test(startTime)) {
            return AvailableUpgradeMode.of(UpgradeMode.LAST_STATE);
        }
        Tuple2<Optional<CheckpointHistoryWrapper.CompletedCheckpointInfo>, Optional<CheckpointHistoryWrapper.PendingCheckpointInfo>> chkInfo = ctx.getFlinkService().getCheckpointInfo(jobId, ctx.getObserveConfig());
        Instant completedTs = ((Optional)chkInfo.f0).map(CheckpointHistoryWrapper.CompletedCheckpointInfo::getTimestamp).map(Instant::ofEpochMilli).orElse(Instant.MIN);
        Instant pendingTs = ((Optional)chkInfo.f1).map(CheckpointHistoryWrapper.PendingCheckpointInfo::getTimestamp).map(Instant::ofEpochMilli).orElse(Instant.MIN);
        if (withinMaxAge.test(completedTs)) {
            return AvailableUpgradeMode.of(UpgradeMode.LAST_STATE);
        }
        if (withinMaxAge.test(pendingTs)) {
            LOG.info("Waiting for pending checkpoint to complete before upgrading.");
            return AvailableUpgradeMode.pendingUpgrade();
        }
        LOG.info("Using savepoint upgrade mode because latest checkpoint is too old for last-state upgrade");
        return AvailableUpgradeMode.of(UpgradeMode.SAVEPOINT);
    }

    protected void restoreJob(FlinkResourceContext<CR> ctx, SPEC spec, Configuration deployConfig, boolean requireHaMetadata) throws Exception {
        Optional<String> savepointOpt = Optional.empty();
        if (spec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) {
            savepointOpt = Optional.ofNullable(((CommonStatus)ctx.getResource().getStatus()).getJobStatus().getSavepointInfo().getLastSavepoint()).flatMap(s -> Optional.ofNullable(s.getLocation()));
        }
        this.deploy(ctx, spec, deployConfig, savepointOpt, requireHaMetadata);
    }

    @Override
    public boolean reconcileOtherChanges(FlinkResourceContext<CR> ctx) throws Exception {
        CommonStatus status = (CommonStatus)ctx.getResource().getStatus();
        JobStatus jobStatus = JobStatus.valueOf((String)status.getJobStatus().getState());
        if (jobStatus == JobStatus.FAILED && ctx.getObserveConfig().getBoolean(KubernetesOperatorConfigOptions.OPERATOR_JOB_RESTART_FAILED)) {
            LOG.info("Stopping failed Flink job...");
            this.cleanupAfterFailedJob(ctx);
            status.setError(null);
            this.resubmitJob(ctx, false);
            return true;
        }
        boolean savepointTriggered = SnapshotUtils.triggerSnapshotIfNeeded(ctx.getFlinkService(), ctx.getResource(), ctx.getObserveConfig(), SnapshotType.SAVEPOINT);
        boolean checkpointTriggered = SnapshotUtils.triggerSnapshotIfNeeded(ctx.getFlinkService(), ctx.getResource(), ctx.getObserveConfig(), SnapshotType.CHECKPOINT);
        return savepointTriggered || checkpointTriggered;
    }

    protected void resubmitJob(FlinkResourceContext<CR> ctx, boolean requireHaMetadata) throws Exception {
        LOG.info("Resubmitting Flink job...");
        Object specToRecover = ReconciliationUtils.getDeployedSpec(ctx.getResource());
        if (requireHaMetadata) {
            specToRecover.getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
        }
        this.restoreJob(ctx, specToRecover, ctx.getObserveConfig(), requireHaMetadata);
    }

    private void redeployWithSavepoint(FlinkResourceContext<CR> ctx, Configuration deployConfig, CR resource, STATUS status, SPEC currentDeploySpec, JobState desiredJobState) throws Exception {
        LOG.info("Redeploying from savepoint");
        this.cancelJob(ctx, UpgradeMode.STATELESS);
        String savepoint = currentDeploySpec.getJob().getInitialSavepointPath();
        currentDeploySpec.getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
        status.getJobStatus().getSavepointInfo().setLastSavepoint(Savepoint.of((String)savepoint, (SnapshotTriggerType)SnapshotTriggerType.UNKNOWN));
        if (desiredJobState == JobState.RUNNING) {
            this.deploy(ctx, currentDeploySpec, ctx.getDeployConfig((AbstractFlinkSpec)currentDeploySpec), Optional.of(savepoint), false);
        }
        ReconciliationUtils.updateStatusForDeployedSpec(resource, deployConfig, this.clock);
        status.getReconciliationStatus().markReconciledSpecAsStable();
    }

    protected abstract void cancelJob(FlinkResourceContext<CR> var1, UpgradeMode var2) throws Exception;

    protected abstract void cleanupAfterFailedJob(FlinkResourceContext<CR> var1) throws Exception;

    public static final class AvailableUpgradeMode {
        private final Optional<UpgradeMode> upgradeMode;
        private final boolean allowFallback;

        public boolean isAvailable() {
            return this.upgradeMode.isPresent();
        }

        static AvailableUpgradeMode of(UpgradeMode upgradeMode) {
            return new AvailableUpgradeMode(Optional.of(upgradeMode), false);
        }

        static AvailableUpgradeMode unavailable() {
            return new AvailableUpgradeMode(Optional.empty(), true);
        }

        static AvailableUpgradeMode pendingUpgrade() {
            return new AvailableUpgradeMode(Optional.empty(), false);
        }

        public AvailableUpgradeMode(Optional<UpgradeMode> upgradeMode, boolean allowFallback) {
            this.upgradeMode = upgradeMode;
            this.allowFallback = allowFallback;
        }

        public Optional<UpgradeMode> getUpgradeMode() {
            return this.upgradeMode;
        }

        public boolean isAllowFallback() {
            return this.allowFallback;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof AvailableUpgradeMode)) {
                return false;
            }
            AvailableUpgradeMode other = (AvailableUpgradeMode)o;
            if (this.isAllowFallback() != other.isAllowFallback()) {
                return false;
            }
            Optional<UpgradeMode> this$upgradeMode = this.getUpgradeMode();
            Optional<UpgradeMode> other$upgradeMode = other.getUpgradeMode();
            return !(this$upgradeMode == null ? other$upgradeMode != null : !((Object)this$upgradeMode).equals(other$upgradeMode));
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            result = result * 59 + (this.isAllowFallback() ? 79 : 97);
            Optional<UpgradeMode> $upgradeMode = this.getUpgradeMode();
            result = result * 59 + ($upgradeMode == null ? 43 : ((Object)$upgradeMode).hashCode());
            return result;
        }

        public String toString() {
            return "AbstractJobReconciler.AvailableUpgradeMode(upgradeMode=" + this.getUpgradeMode() + ", allowFallback=" + this.isAllowFallback() + ")";
        }
    }
}

