/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.utils;

import java.time.Duration;
import java.time.Instant;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SavepointInfo;
import org.apache.flink.kubernetes.operator.api.status.SavepointTriggerType;
import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.SavepointStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SavepointUtils {
    private static final Logger LOG = LoggerFactory.getLogger(SavepointUtils.class);

    public static boolean savepointInProgress(JobStatus jobStatus) {
        return StringUtils.isNotEmpty((CharSequence)jobStatus.getSavepointInfo().getTriggerId());
    }

    public static SavepointStatus getLastSavepointStatus(AbstractFlinkResource<?, ?> resource) {
        CommonStatus status = (CommonStatus)resource.getStatus();
        JobStatus jobStatus = status.getJobStatus();
        SavepointInfo savepointInfo = jobStatus.getSavepointInfo();
        Long targetSavepointTriggerNonce = ((AbstractFlinkSpec)resource.getSpec()).getJob().getSavepointTriggerNonce();
        Long reconcileSavepointTriggerNonce = status.getReconciliationStatus().deserializeLastReconciledSpec().getJob().getSavepointTriggerNonce();
        if (savepointInfo.getTriggerId() != null) {
            return SavepointStatus.PENDING;
        }
        if (targetSavepointTriggerNonce != null && !Objects.equals(targetSavepointTriggerNonce, reconcileSavepointTriggerNonce)) {
            return SavepointStatus.PENDING;
        }
        Savepoint lastSavepoint = savepointInfo.getLastSavepoint();
        if (lastSavepoint != null) {
            if (Objects.equals(reconcileSavepointTriggerNonce, savepointInfo.getLastSavepoint().getTriggerNonce())) {
                return SavepointStatus.SUCCEEDED;
            }
            if (lastSavepoint.getTriggerType() != SavepointTriggerType.MANUAL) {
                return SavepointStatus.SUCCEEDED;
            }
        } else {
            return null;
        }
        return SavepointStatus.ABANDONED;
    }

    public static boolean triggerSavepointIfNeeded(FlinkService flinkService, AbstractFlinkResource<?, ?> resource, Configuration conf) throws Exception {
        Optional<SavepointTriggerType> triggerOpt = SavepointUtils.shouldTriggerSavepoint(resource, conf);
        if (triggerOpt.isEmpty()) {
            return false;
        }
        SavepointTriggerType triggerType = triggerOpt.get();
        flinkService.triggerSavepoint(((CommonStatus)resource.getStatus()).getJobStatus().getJobId(), triggerType, ((CommonStatus)resource.getStatus()).getJobStatus().getSavepointInfo(), conf);
        return true;
    }

    @VisibleForTesting
    protected static Optional<SavepointTriggerType> shouldTriggerSavepoint(AbstractFlinkResource<?, ?> resource, Configuration conf) {
        boolean triggerNonceChanged;
        CommonStatus status = (CommonStatus)resource.getStatus();
        JobSpec jobSpec = ((AbstractFlinkSpec)resource.getSpec()).getJob();
        JobStatus jobStatus = status.getJobStatus();
        if (!ReconciliationUtils.isJobRunning(status) || SavepointUtils.savepointInProgress(jobStatus)) {
            return Optional.empty();
        }
        boolean bl = triggerNonceChanged = jobSpec.getSavepointTriggerNonce() != null && !jobSpec.getSavepointTriggerNonce().equals(status.getReconciliationStatus().deserializeLastReconciledSpec().getJob().getSavepointTriggerNonce());
        if (triggerNonceChanged) {
            return Optional.of(SavepointTriggerType.MANUAL);
        }
        Duration savepointInterval = (Duration)conf.get(KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL);
        if (savepointInterval.isZero()) {
            return Optional.empty();
        }
        long lastTriggerTs = jobStatus.getSavepointInfo().getLastPeriodicSavepointTimestamp();
        Instant lastTrigger = lastTriggerTs == 0L ? Instant.parse(resource.getMetadata().getCreationTimestamp()) : Instant.ofEpochMilli(lastTriggerTs);
        Instant now = Instant.now();
        if (lastTrigger.plus(savepointInterval).isBefore(Instant.now())) {
            LOG.info("Triggering new periodic savepoint after {}", (Object)Duration.between(lastTrigger, now));
            return Optional.of(SavepointTriggerType.PERIODIC);
        }
        return Optional.empty();
    }

    public static boolean gracePeriodEnded(Configuration conf, SavepointInfo savepointInfo) {
        Duration gracePeriod = (Duration)conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_TRIGGER_GRACE_PERIOD);
        Instant endOfGracePeriod = Instant.ofEpochMilli(savepointInfo.getTriggerTimestamp()).plus(gracePeriod);
        return endOfGracePeriod.isBefore(Instant.now());
    }

    public static void resetTriggerIfJobNotRunning(AbstractFlinkResource<?, ?> resource, EventRecorder eventRecorder) {
        CommonStatus status = (CommonStatus)resource.getStatus();
        JobStatus jobStatus = status.getJobStatus();
        if (!ReconciliationUtils.isJobRunning(status) && SavepointUtils.savepointInProgress(jobStatus)) {
            SavepointInfo savepointInfo = jobStatus.getSavepointInfo();
            ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(savepointInfo, resource);
            savepointInfo.resetTrigger();
            LOG.error("Job is not running, cancelling savepoint operation");
            eventRecorder.triggerEvent(resource, EventRecorder.Type.Warning, EventRecorder.Reason.SavepointError, EventRecorder.Component.Operator, SavepointUtils.createSavepointError(savepointInfo, ((AbstractFlinkSpec)resource.getSpec()).getJob().getSavepointTriggerNonce()));
        }
    }

    public static String createSavepointError(SavepointInfo savepointInfo, Long triggerNonce) {
        return SavepointTriggerType.PERIODIC == savepointInfo.getTriggerType() ? "Periodic savepoint failed" : "Savepoint failed for savepointTriggerNonce: " + triggerNonce;
    }

    public static SavepointFormatType getSavepointFormatType(Configuration configuration) {
        SavepointFormatType savepointFormatType = SavepointFormatType.CANONICAL;
        if (configuration.get(FlinkConfigBuilder.FLINK_VERSION) != null && ((FlinkVersion)configuration.get(FlinkConfigBuilder.FLINK_VERSION)).isNewerVersionThan(FlinkVersion.v1_14)) {
            savepointFormatType = (SavepointFormatType)configuration.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE);
        }
        return savepointFormatType;
    }
}

