/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.utils;

import io.fabric8.kubernetes.client.KubernetesClient;
import java.text.ParseException;
import java.time.Duration;
import java.time.Instant;
import java.util.Date;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.status.CheckpointInfo;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
import org.apache.flink.kubernetes.operator.api.status.SavepointInfo;
import org.apache.flink.kubernetes.operator.api.status.SnapshotInfo;
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.SnapshotType;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.SnapshotStatus;
import org.apache.logging.log4j.core.util.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnapshotUtils {
    private static final Logger LOG = LoggerFactory.getLogger(SnapshotUtils.class);

    public static boolean savepointInProgress(JobStatus jobStatus) {
        return StringUtils.isNotEmpty((CharSequence)jobStatus.getSavepointInfo().getTriggerId());
    }

    public static boolean checkpointInProgress(JobStatus jobStatus) {
        return StringUtils.isNotEmpty((CharSequence)jobStatus.getCheckpointInfo().getTriggerId());
    }

    @VisibleForTesting
    public static SnapshotStatus getLastSnapshotStatus(AbstractFlinkResource<?, ?> resource, SnapshotType snapshotType) {
        SavepointInfo snapshotInfo;
        Long reconciledTriggerNonce;
        Long triggerNonce;
        CommonStatus status = (CommonStatus)resource.getStatus();
        JobStatus jobStatus = status.getJobStatus();
        JobSpec jobSpec = ((AbstractFlinkSpec)resource.getSpec()).getJob();
        JobSpec reconciledJobSpec = status.getReconciliationStatus().deserializeLastReconciledSpec().getJob();
        switch (snapshotType) {
            case SAVEPOINT: {
                triggerNonce = jobSpec.getSavepointTriggerNonce();
                reconciledTriggerNonce = reconciledJobSpec.getSavepointTriggerNonce();
                snapshotInfo = jobStatus.getSavepointInfo();
                break;
            }
            case CHECKPOINT: {
                triggerNonce = jobSpec.getCheckpointTriggerNonce();
                reconciledTriggerNonce = reconciledJobSpec.getCheckpointTriggerNonce();
                snapshotInfo = jobStatus.getCheckpointInfo();
                break;
            }
            default: {
                throw new IllegalArgumentException("Unsupported snapshot type: " + snapshotType);
            }
        }
        if (snapshotInfo.getTriggerId() != null) {
            return SnapshotStatus.PENDING;
        }
        if (triggerNonce != null && !Objects.equals(triggerNonce, reconciledTriggerNonce)) {
            return SnapshotStatus.PENDING;
        }
        Long lastTriggerNonce = snapshotInfo.getLastTriggerNonce();
        SnapshotTriggerType lastSnapshotTriggerType = snapshotInfo.getLastTriggerType();
        if (lastSnapshotTriggerType == null) {
            return null;
        }
        if (Objects.equals(reconciledTriggerNonce, lastTriggerNonce)) {
            return SnapshotStatus.SUCCEEDED;
        }
        if (lastSnapshotTriggerType != SnapshotTriggerType.MANUAL) {
            return SnapshotStatus.SUCCEEDED;
        }
        return SnapshotStatus.ABANDONED;
    }

    public static boolean triggerSnapshotIfNeeded(FlinkService flinkService, AbstractFlinkResource<?, ?> resource, Configuration conf, SnapshotType snapshotType) throws Exception {
        Optional<SnapshotTriggerType> triggerOpt = SnapshotUtils.shouldTriggerSnapshot(resource, conf, snapshotType);
        if (triggerOpt.isEmpty()) {
            return false;
        }
        SnapshotTriggerType triggerType = triggerOpt.get();
        String jobId = ((CommonStatus)resource.getStatus()).getJobStatus().getJobId();
        switch (snapshotType) {
            case SAVEPOINT: {
                flinkService.triggerSavepoint(jobId, triggerType, ((CommonStatus)resource.getStatus()).getJobStatus().getSavepointInfo(), conf);
                break;
            }
            case CHECKPOINT: {
                flinkService.triggerCheckpoint(jobId, triggerType, ((CommonStatus)resource.getStatus()).getJobStatus().getCheckpointInfo(), conf);
                break;
            }
            default: {
                throw new IllegalArgumentException("Unsupported snapshot type: " + snapshotType);
            }
        }
        return true;
    }

    @VisibleForTesting
    protected static Optional<SnapshotTriggerType> shouldTriggerSnapshot(AbstractFlinkResource<?, ?> resource, Configuration conf, SnapshotType snapshotType) {
        Instant lastTrigger;
        boolean triggerNonceChanged;
        String automaticTriggerExpression;
        SavepointInfo snapshotInfo;
        boolean inProgress;
        Long reconciledTriggerNonce;
        Long triggerNonce;
        CommonStatus status = (CommonStatus)resource.getStatus();
        JobStatus jobStatus = status.getJobStatus();
        JobSpec jobSpec = ((AbstractFlinkSpec)resource.getSpec()).getJob();
        if (!ReconciliationUtils.isJobRunning(status)) {
            return Optional.empty();
        }
        JobSpec reconciledJobSpec = status.getReconciliationStatus().deserializeLastReconciledSpec().getJob();
        switch (snapshotType) {
            case SAVEPOINT: {
                triggerNonce = jobSpec.getSavepointTriggerNonce();
                reconciledTriggerNonce = reconciledJobSpec.getSavepointTriggerNonce();
                inProgress = SnapshotUtils.savepointInProgress(jobStatus);
                snapshotInfo = jobStatus.getSavepointInfo();
                automaticTriggerExpression = (String)conf.get(KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL);
                break;
            }
            case CHECKPOINT: {
                triggerNonce = jobSpec.getCheckpointTriggerNonce();
                reconciledTriggerNonce = reconciledJobSpec.getCheckpointTriggerNonce();
                inProgress = SnapshotUtils.checkpointInProgress(jobStatus);
                snapshotInfo = jobStatus.getCheckpointInfo();
                automaticTriggerExpression = (String)conf.get(KubernetesOperatorConfigOptions.PERIODIC_CHECKPOINT_INTERVAL);
                break;
            }
            default: {
                throw new IllegalArgumentException("Unsupported snapshot type: " + snapshotType);
            }
        }
        if (inProgress) {
            return Optional.empty();
        }
        boolean bl = triggerNonceChanged = triggerNonce != null && !triggerNonce.equals(reconciledTriggerNonce);
        if (triggerNonceChanged) {
            if (snapshotType == SnapshotType.CHECKPOINT && !SnapshotUtils.isSnapshotTriggeringSupported(conf)) {
                LOG.warn("Manual checkpoint triggering is attempted, but is not supported (requires Flink 1.17+)");
                return Optional.empty();
            }
            return Optional.of(SnapshotTriggerType.MANUAL);
        }
        long lastTriggerTs = snapshotInfo.getLastPeriodicTriggerTimestamp();
        Instant instant = lastTrigger = lastTriggerTs == 0L ? Instant.parse(resource.getMetadata().getCreationTimestamp()) : Instant.ofEpochMilli(lastTriggerTs);
        if (SnapshotUtils.shouldTriggerAutomaticSnapshot(snapshotType, automaticTriggerExpression, lastTrigger)) {
            if (snapshotType == SnapshotType.CHECKPOINT && !SnapshotUtils.isSnapshotTriggeringSupported(conf)) {
                LOG.warn("Automatic checkpoints triggering is configured but is not supported (requires Flink 1.17+)");
                return Optional.empty();
            }
            return Optional.of(SnapshotTriggerType.PERIODIC);
        }
        return Optional.empty();
    }

    @VisibleForTesting
    static boolean shouldTriggerAutomaticSnapshot(SnapshotType snapshotType, String automaticTriggerExpression, Instant lastTrigger) {
        if (StringUtils.isBlank((CharSequence)automaticTriggerExpression)) {
            return false;
        }
        Optional<Duration> interval = SnapshotUtils.interpretAsInterval(automaticTriggerExpression);
        Optional<CronExpression> cron = SnapshotUtils.interpretAsCron(automaticTriggerExpression);
        if (interval.isPresent() && cron.isPresent()) {
            LOG.error("Something went wrong with the automatic {} trigger expression {}. This setting cannot be simultaneously a valid Duration and a cron expression.", (Object)snapshotType, (Object)automaticTriggerExpression);
            return false;
        }
        if (interval.isPresent()) {
            return SnapshotUtils.shouldTriggerIntervalBasedSnapshot(snapshotType, interval.get(), lastTrigger);
        }
        if (cron.isPresent()) {
            return SnapshotUtils.shouldTriggerCronBasedSnapshot(snapshotType, cron.get(), lastTrigger, Instant.now());
        }
        LOG.warn("Automatic {} triggering is configured, but the trigger expression '{}' is neither a valid Duration, nor a cron expression.", (Object)snapshotType, (Object)automaticTriggerExpression);
        return false;
    }

    @VisibleForTesting
    static boolean shouldTriggerCronBasedSnapshot(SnapshotType snapshotType, CronExpression cronExpression, Instant lastTriggerDateInstant, Instant nowInstant) {
        Date now = Date.from(nowInstant);
        Date lastTrigger = Date.from(lastTriggerDateInstant);
        Date nextValidTimeAfterLastTrigger = cronExpression.getNextValidTimeAfter(lastTrigger);
        if (nextValidTimeAfterLastTrigger != null && nextValidTimeAfterLastTrigger.before(now)) {
            LOG.info("Triggering new automatic {} based on cron schedule '{}' due at {}", new Object[]{snapshotType.toString().toLowerCase(), cronExpression.toString(), nextValidTimeAfterLastTrigger});
            return true;
        }
        return false;
    }

    @VisibleForTesting
    static boolean shouldTriggerIntervalBasedSnapshot(SnapshotType snapshotType, Duration interval, Instant lastTrigger) {
        if (interval.isZero()) {
            return false;
        }
        Instant now = Instant.now();
        if (lastTrigger.plus(interval).isBefore(Instant.now())) {
            LOG.info("Triggering new automatic {} after {}", (Object)snapshotType.toString().toLowerCase(), (Object)Duration.between(lastTrigger, now));
            return true;
        }
        return false;
    }

    @VisibleForTesting
    static Optional<Duration> interpretAsInterval(String triggerExpression) {
        try {
            return Optional.of((Duration)ConfigurationUtils.convertValue((Object)triggerExpression, Duration.class));
        }
        catch (Exception exception) {
            return Optional.empty();
        }
    }

    @VisibleForTesting
    static Optional<CronExpression> interpretAsCron(String triggerExpression) {
        try {
            return Optional.of(new CronExpression(triggerExpression));
        }
        catch (ParseException e) {
            return Optional.empty();
        }
    }

    public static boolean isSnapshotTriggeringSupported(Configuration conf) {
        return conf.get(FlinkConfigBuilder.FLINK_VERSION) != null && ((FlinkVersion)conf.get(FlinkConfigBuilder.FLINK_VERSION)).isNewerVersionThan(FlinkVersion.v1_16);
    }

    public static boolean gracePeriodEnded(Duration gracePeriod, SnapshotInfo snapshotInfo) {
        Instant endOfGracePeriod = Instant.ofEpochMilli(snapshotInfo.getTriggerTimestamp()).plus(gracePeriod);
        return endOfGracePeriod.isBefore(Instant.now());
    }

    public static void resetSnapshotTriggers(AbstractFlinkResource<?, ?> resource, EventRecorder eventRecorder, KubernetesClient client) {
        CommonStatus status = (CommonStatus)resource.getStatus();
        JobStatus jobStatus = status.getJobStatus();
        if (!ReconciliationUtils.isJobRunning(status)) {
            if (SnapshotUtils.savepointInProgress(jobStatus)) {
                SavepointInfo savepointInfo = jobStatus.getSavepointInfo();
                ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce((SnapshotInfo)savepointInfo, resource, SnapshotType.SAVEPOINT);
                savepointInfo.resetTrigger();
                LOG.error("Job is not running, cancelling savepoint operation");
                eventRecorder.triggerEvent(resource, EventRecorder.Type.Warning, EventRecorder.Reason.SavepointError, EventRecorder.Component.Operator, savepointInfo.formatErrorMessage(((AbstractFlinkSpec)resource.getSpec()).getJob().getSavepointTriggerNonce()), client);
            }
            if (SnapshotUtils.checkpointInProgress(jobStatus)) {
                CheckpointInfo checkpointInfo = jobStatus.getCheckpointInfo();
                ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce((SnapshotInfo)checkpointInfo, resource, SnapshotType.CHECKPOINT);
                checkpointInfo.resetTrigger();
                LOG.error("Job is not running, cancelling checkpoint operation");
                eventRecorder.triggerEvent(resource, EventRecorder.Type.Warning, EventRecorder.Reason.CheckpointError, EventRecorder.Component.Operator, checkpointInfo.formatErrorMessage(((AbstractFlinkSpec)resource.getSpec()).getJob().getCheckpointTriggerNonce()), client);
            }
        }
    }
}

