/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.observer;

import io.fabric8.kubernetes.api.model.HasMetadata;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.autoscaler.utils.DateTimeUtils;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec;
import org.apache.flink.kubernetes.operator.api.status.Checkpoint;
import org.apache.flink.kubernetes.operator.api.status.CheckpointInfo;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SavepointInfo;
import org.apache.flink.kubernetes.operator.api.status.SnapshotInfo;
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.observer.CheckpointFetchResult;
import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.SnapshotType;
import org.apache.flink.kubernetes.operator.utils.ConfigOptionUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils;
import org.apache.flink.kubernetes.operator.utils.SnapshotUtils;
import org.apache.flink.util.CollectionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnapshotObserver<CR extends AbstractFlinkResource<?, STATUS>, STATUS extends CommonStatus<?>> {
    private static final Logger LOG = LoggerFactory.getLogger(SnapshotObserver.class);
    public static final Function<FlinkStateSnapshot, Instant> EXTRACT_SNAPSHOT_TIME = s -> DateTimeUtils.parseKubernetes((String)s.getMetadata().getCreationTimestamp());
    private static final Set<SnapshotTriggerType> CLEAN_UP_SNAPSHOT_TRIGGER_TYPES = Set.of(SnapshotTriggerType.PERIODIC, SnapshotTriggerType.UPGRADE);
    private final EventRecorder eventRecorder;

    public SnapshotObserver(EventRecorder eventRecorder) {
        this.eventRecorder = eventRecorder;
    }

    public void observeSavepointStatus(FlinkResourceContext<CR> ctx) {
        LOG.debug("Observing savepoint status");
        CR resource = ctx.getResource();
        JobStatus jobStatus = ((CommonStatus)resource.getStatus()).getJobStatus();
        String jobId = jobStatus.getJobId();
        if (SnapshotUtils.savepointInProgress(jobStatus)) {
            this.observeTriggeredSavepoint(ctx, jobId);
        }
        if (ReconciliationUtils.isJobInTerminalState((CommonStatus)resource.getStatus())) {
            this.observeLatestCheckpoint(ctx, jobId);
        }
        this.cleanupSavepointHistory(ctx);
    }

    public void observeCheckpointStatus(FlinkResourceContext<CR> ctx) {
        if (!SnapshotUtils.isSnapshotTriggeringSupported(ctx.getObserveConfig())) {
            return;
        }
        CR resource = ctx.getResource();
        JobStatus jobStatus = ((CommonStatus)resource.getStatus()).getJobStatus();
        String jobId = jobStatus.getJobId();
        if (SnapshotUtils.checkpointInProgress(jobStatus)) {
            this.observeTriggeredCheckpoint(ctx, jobId);
        }
    }

    private void observeTriggeredSavepoint(FlinkResourceContext<CR> ctx, String jobID) {
        CR resource = ctx.getResource();
        SavepointInfo savepointInfo = ((CommonStatus)resource.getStatus()).getJobStatus().getSavepointInfo();
        LOG.debug("Observing in-progress savepoint");
        SavepointFetchResult savepointFetchResult = ctx.getFlinkService().fetchSavepointInfo(savepointInfo.getTriggerId(), jobID, ctx.getObserveConfig());
        if (savepointFetchResult.isPending()) {
            LOG.info("Savepoint operation not finished yet...");
            return;
        }
        if (savepointFetchResult.getError() != null) {
            String err = savepointFetchResult.getError();
            Duration gracePeriod = (Duration)ctx.getObserveConfig().get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_TRIGGER_GRACE_PERIOD);
            if (SnapshotUtils.gracePeriodEnded(gracePeriod, (SnapshotInfo)savepointInfo)) {
                LOG.error("Savepoint attempt failed after grace period. Won't be retried again: " + err);
                ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce(savepointInfo.getTriggerType(), resource, SnapshotType.SAVEPOINT);
            } else {
                LOG.warn("Savepoint failed within grace period, retrying: " + err);
            }
            this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>)resource, EventRecorder.Type.Warning, EventRecorder.Reason.SavepointError, EventRecorder.Component.Operator, savepointInfo.formatErrorMessage(((AbstractFlinkSpec)resource.getSpec()).getJob().getSavepointTriggerNonce()), ctx.getKubernetesClient());
            savepointInfo.resetTrigger();
            return;
        }
        Savepoint savepoint = new Savepoint(savepointInfo.getTriggerTimestamp().longValue(), savepointFetchResult.getLocation(), savepointInfo.getTriggerType(), savepointInfo.getFormatType(), SnapshotTriggerType.MANUAL == savepointInfo.getTriggerType() ? ((AbstractFlinkSpec)resource.getSpec()).getJob().getSavepointTriggerNonce() : null);
        ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce(savepointInfo.getTriggerType(), resource, SnapshotType.SAVEPOINT);
        savepointInfo.updateLastSavepoint(savepoint);
    }

    private void observeTriggeredCheckpoint(FlinkResourceContext<CR> ctx, String jobID) {
        CR resource = ctx.getResource();
        CheckpointInfo checkpointInfo = ((CommonStatus)resource.getStatus()).getJobStatus().getCheckpointInfo();
        LOG.info("Observing checkpoint status.");
        CheckpointFetchResult checkpointFetchResult = ctx.getFlinkService().fetchCheckpointInfo(checkpointInfo.getTriggerId(), jobID, ctx.getObserveConfig());
        if (checkpointFetchResult.isPending()) {
            LOG.info("Checkpoint operation not finished yet...");
            return;
        }
        if (checkpointFetchResult.getError() != null) {
            String err = checkpointFetchResult.getError();
            Duration gracePeriod = (Duration)ctx.getObserveConfig().get(KubernetesOperatorConfigOptions.OPERATOR_CHECKPOINT_TRIGGER_GRACE_PERIOD);
            if (SnapshotUtils.gracePeriodEnded(gracePeriod, (SnapshotInfo)checkpointInfo)) {
                LOG.error("Checkpoint attempt failed after grace period. Won't be retried again: " + err);
                ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce(checkpointInfo.getTriggerType(), resource, SnapshotType.CHECKPOINT);
            } else {
                LOG.warn("Checkpoint failed within grace period, retrying: " + err);
            }
            this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>)resource, EventRecorder.Type.Warning, EventRecorder.Reason.CheckpointError, EventRecorder.Component.Operator, checkpointInfo.formatErrorMessage(((AbstractFlinkSpec)resource.getSpec()).getJob().getCheckpointTriggerNonce()), ctx.getKubernetesClient());
            checkpointInfo.resetTrigger();
            return;
        }
        Checkpoint checkpoint = new Checkpoint(checkpointInfo.getTriggerTimestamp().longValue(), checkpointInfo.getTriggerType(), checkpointInfo.getFormatType(), SnapshotTriggerType.MANUAL == checkpointInfo.getTriggerType() ? ((AbstractFlinkSpec)resource.getSpec()).getJob().getCheckpointTriggerNonce() : null);
        ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce(checkpointInfo.getTriggerType(), resource, SnapshotType.CHECKPOINT);
        checkpointInfo.updateLastCheckpoint(checkpoint);
    }

    private void cleanupSavepointHistory(FlinkResourceContext<CR> ctx) {
        Set<FlinkStateSnapshot> snapshots = FlinkStateSnapshotUtils.getFlinkStateSnapshotsSupplier(ctx).get();
        this.cleanupSavepointHistoryLegacy(ctx, snapshots);
        if (CollectionUtil.isNullOrEmpty(snapshots)) {
            return;
        }
        if (((Boolean)ctx.getObserveConfig().get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_CLEANUP_ENABLED)).booleanValue()) {
            Set<FlinkStateSnapshot> savepointsToDelete = this.getFlinkStateSnapshotsToCleanUp(snapshots, ctx.getObserveConfig(), ctx.getOperatorConfig(), SnapshotType.SAVEPOINT);
            Set<FlinkStateSnapshot> checkpointsToDelete = this.getFlinkStateSnapshotsToCleanUp(snapshots, ctx.getObserveConfig(), ctx.getOperatorConfig(), SnapshotType.CHECKPOINT);
            Stream.concat(savepointsToDelete.stream(), checkpointsToDelete.stream()).forEach(snapshot -> ctx.getKubernetesClient().resource((HasMetadata)snapshot).withTimeoutInMillis(0L).delete());
        }
    }

    @VisibleForTesting
    Set<FlinkStateSnapshot> getFlinkStateSnapshotsToCleanUp(Collection<FlinkStateSnapshot> snapshots, Configuration observeConfig, FlinkOperatorConfiguration operatorConfig, SnapshotType snapshotType) {
        List snapshotList = snapshots.stream().filter(s -> !s.isMarkedForDeletion()).filter(s -> CLEAN_UP_SNAPSHOT_TRIGGER_TYPES.contains(FlinkStateSnapshotUtils.getSnapshotTriggerType(s))).filter(s -> ((FlinkStateSnapshotSpec)s.getSpec()).isSavepoint() == (snapshotType == SnapshotType.SAVEPOINT)).sorted(Comparator.comparing(EXTRACT_SNAPSHOT_TIME)).collect(Collectors.toList());
        FlinkStateSnapshot lastCompleteSnapshot = snapshotList.stream().filter(s -> s.getStatus() != null && FlinkStateSnapshotStatus.State.COMPLETED.equals((Object)((FlinkStateSnapshotStatus)s.getStatus()).getState())).max(Comparator.comparing(EXTRACT_SNAPSHOT_TIME)).orElse(null);
        long maxCount = this.getMaxCountForSnapshotType(observeConfig, operatorConfig, snapshotType);
        long maxTms = this.getMinAgeForSnapshotType(observeConfig, operatorConfig, snapshotType);
        HashSet<FlinkStateSnapshot> result = new HashSet<FlinkStateSnapshot>();
        if (snapshotList.size() < 2) {
            return result;
        }
        for (FlinkStateSnapshot snapshot : snapshotList) {
            if (snapshot.equals((Object)lastCompleteSnapshot)) continue;
            if (result.size() == snapshotList.size() - 1) break;
            long ts = EXTRACT_SNAPSHOT_TIME.apply(snapshot).toEpochMilli();
            if ((long)(snapshotList.size() - result.size()) <= maxCount && ts >= maxTms) continue;
            result.add(snapshot);
        }
        return result;
    }

    @VisibleForTesting
    void cleanupSavepointHistoryLegacy(FlinkResourceContext<CR> ctx, Set<FlinkStateSnapshot> allSecondarySnapshotResources) {
        long maxTms = this.getMinAgeForSnapshotType(ctx.getObserveConfig(), ctx.getOperatorConfig(), SnapshotType.SAVEPOINT);
        long maxCount = this.getMaxCountForSnapshotType(ctx.getObserveConfig(), ctx.getOperatorConfig(), SnapshotType.SAVEPOINT);
        long completedSavepointCrs = allSecondarySnapshotResources.stream().filter(s -> s.getStatus() != null && FlinkStateSnapshotStatus.State.COMPLETED.equals((Object)((FlinkStateSnapshotStatus)s.getStatus()).getState())).filter(s -> ((FlinkStateSnapshotSpec)s.getSpec()).isSavepoint()).count();
        maxCount = Math.max(0L, maxCount - completedSavepointCrs);
        List savepointHistory = ((CommonStatus)ctx.getResource().getStatus()).getJobStatus().getSavepointInfo().getSavepointHistory();
        Boolean savepointCleanupEnabled = (Boolean)ctx.getObserveConfig().get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_CLEANUP_ENABLED);
        if (savepointHistory.isEmpty() || savepointHistory.size() == 1 && completedSavepointCrs == 0L) {
            return;
        }
        Savepoint lastSavepoint = (Savepoint)savepointHistory.get(savepointHistory.size() - 1);
        while ((long)savepointHistory.size() > maxCount) {
            Savepoint sp = (Savepoint)savepointHistory.remove(0);
            if (!savepointCleanupEnabled.booleanValue()) continue;
            this.disposeSavepointQuietly(ctx, sp.getLocation());
        }
        Iterator it = savepointHistory.iterator();
        while (it.hasNext()) {
            Savepoint sp = (Savepoint)it.next();
            if (sp == lastSavepoint && completedSavepointCrs == 0L || sp.getTimeStamp() >= maxTms) continue;
            it.remove();
            if (!savepointCleanupEnabled.booleanValue()) continue;
            this.disposeSavepointQuietly(ctx, sp.getLocation());
        }
    }

    private void disposeSavepointQuietly(FlinkResourceContext<CR> ctx, String path) {
        try {
            LOG.info("Disposing savepoint {}", (Object)path);
            ctx.getFlinkService().disposeSavepoint(path, ctx.getObserveConfig());
        }
        catch (Exception e) {
            LOG.error("Exception while disposing savepoint {}", (Object)path, (Object)e);
        }
    }

    private long getMinAgeForSnapshotType(Configuration observeConfig, FlinkOperatorConfiguration operatorConfig, SnapshotType snapshotType) {
        switch (snapshotType) {
            case CHECKPOINT: {
                return 0L;
            }
            case SAVEPOINT: {
                Duration maxAge = ConfigOptionUtils.getValueWithThreshold(observeConfig, KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE, operatorConfig.getSavepointHistoryAgeThreshold());
                return System.currentTimeMillis() - maxAge.toMillis();
            }
        }
        throw new IllegalArgumentException(String.format("Unknown snapshot type %s", snapshotType.name()));
    }

    private long getMaxCountForSnapshotType(Configuration observeConfig, FlinkOperatorConfiguration operatorConfig, SnapshotType snapshotType) {
        switch (snapshotType) {
            case CHECKPOINT: {
                return Math.max(1, (Integer)observeConfig.get(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS));
            }
            case SAVEPOINT: {
                return Math.max(1, ConfigOptionUtils.getValueWithThreshold(observeConfig, KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT, operatorConfig.getSavepointHistoryCountThreshold()));
            }
        }
        throw new IllegalArgumentException(String.format("Unknown snapshot type %s", snapshotType.name()));
    }

    private void observeLatestCheckpoint(FlinkResourceContext<CR> ctx, String jobId) {
        CommonStatus status = (CommonStatus)ctx.getResource().getStatus();
        JobStatus jobStatus = status.getJobStatus();
        ctx.getFlinkService().getLastCheckpoint(JobID.fromHexString((String)jobId), ctx.getObserveConfig()).ifPresentOrElse(snapshot -> jobStatus.setUpgradeSavepointPath(snapshot.getLocation()), () -> {
            if (ReconciliationUtils.isJobCancelled(status)) {
                jobStatus.setUpgradeSavepointPath(null);
            }
        });
    }
}

