/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.reconciler.snapshot;

import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import java.util.Optional;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.controller.FlinkStateSnapshotContext;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils;
import org.apache.flink.kubernetes.operator.utils.SnapshotUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StateSnapshotReconciler {
    private static final Logger LOG = LoggerFactory.getLogger(StateSnapshotReconciler.class);
    private final FlinkResourceContextFactory ctxFactory;
    private final EventRecorder eventRecorder;

    public void reconcile(FlinkStateSnapshotContext ctx) {
        Optional<String> triggerIdOpt;
        FlinkStateSnapshot resource = ctx.getResource();
        FlinkStateSnapshotStatus.State savepointState = ((FlinkStateSnapshotStatus)resource.getStatus()).getState();
        if (!FlinkStateSnapshotStatus.State.TRIGGER_PENDING.equals((Object)savepointState)) {
            return;
        }
        if (((FlinkStateSnapshotSpec)resource.getSpec()).isSavepoint() && ((FlinkStateSnapshotSpec)resource.getSpec()).getSavepoint().getAlreadyExists().booleanValue()) {
            LOG.info("Snapshot {} is marked as completed in spec, skipping triggering savepoint.", (Object)resource.getMetadata().getName());
            FlinkStateSnapshotUtils.snapshotSuccessful(resource, ((FlinkStateSnapshotSpec)resource.getSpec()).getSavepoint().getPath(), true);
            return;
        }
        if (FlinkStateSnapshotUtils.abandonSnapshotIfJobNotRunning(ctx.getKubernetesClient(), ctx.getResource(), ctx.getSecondaryResource().orElse(null), this.eventRecorder)) {
            return;
        }
        String jobId = ((CommonStatus)ctx.getSecondaryResource().orElseThrow().getStatus()).getJobStatus().getJobId();
        try {
            triggerIdOpt = this.triggerCheckpointOrSavepoint((FlinkStateSnapshotSpec)resource.getSpec(), ctx, jobId);
        }
        catch (Exception e) {
            LOG.error("Failed to trigger snapshot for resource {}", (Object)ctx.getResource(), (Object)e);
            throw new ReconciliationException(e);
        }
        if (triggerIdOpt.isEmpty()) {
            LOG.warn("Failed to trigger snapshot {}", (Object)resource.getMetadata().getName());
            return;
        }
        FlinkStateSnapshotUtils.snapshotInProgress(resource, triggerIdOpt.get());
    }

    public DeleteControl cleanup(FlinkStateSnapshotContext ctx) throws Exception {
        FlinkStateSnapshot resource = ctx.getResource();
        FlinkStateSnapshotStatus.State state = ((FlinkStateSnapshotStatus)resource.getStatus()).getState();
        String resourceName = resource.getMetadata().getName();
        LOG.info("Cleaning up resource {}...", (Object)resourceName);
        if (((FlinkStateSnapshotSpec)resource.getSpec()).isCheckpoint()) {
            return DeleteControl.defaultDelete();
        }
        if (!((FlinkStateSnapshotSpec)resource.getSpec()).getSavepoint().getDisposeOnDelete().booleanValue()) {
            return DeleteControl.defaultDelete();
        }
        if (((FlinkStateSnapshotSpec)resource.getSpec()).getJobReference() == null || ((FlinkStateSnapshotSpec)resource.getSpec()).getJobReference().getName() == null) {
            return DeleteControl.defaultDelete();
        }
        switch (state) {
            case IN_PROGRESS: {
                LOG.info("Cannot delete resource {} yet as savepoint is still in progress...", (Object)resourceName);
                return DeleteControl.noFinalizerRemoval().rescheduleAfter(ctx.getOperatorConfig().getReconcileInterval().toMillis());
            }
            case COMPLETED: {
                FlinkDeployment flinkDeployment = this.getFlinkDeployment(ctx);
                return this.handleSnapshotCleanup(resource, flinkDeployment, ctx);
            }
            case FAILED: 
            case TRIGGER_PENDING: 
            case ABANDONED: {
                LOG.info("Savepoint state is {}, cleaning up resource {} without disposal...", (Object)state.name(), (Object)resourceName);
                return DeleteControl.defaultDelete();
            }
        }
        LOG.info("Unknown savepoint state for {}: {}", (Object)resourceName, (Object)state);
        return DeleteControl.defaultDelete();
    }

    private FlinkDeployment getFlinkDeployment(FlinkStateSnapshotContext ctx) {
        Optional<AbstractFlinkResource<?, ?>> secondaryResourceOpt = ctx.getSecondaryResource();
        if (secondaryResourceOpt.isEmpty()) {
            throw new IllegalArgumentException("Cannot dispose of snapshot because referenced Flink job cannot be found!");
        }
        if (!ReconciliationUtils.isJobRunning((CommonStatus)secondaryResourceOpt.get().getStatus())) {
            throw new IllegalArgumentException("Cannot dispose of snapshot because referenced Flink job is not running!");
        }
        return ctx.getReferencedJobFlinkDeployment();
    }

    private DeleteControl handleSnapshotCleanup(FlinkStateSnapshot flinkStateSnapshot, FlinkDeployment flinkDeployment, FlinkStateSnapshotContext ctx) {
        String resourceName = flinkStateSnapshot.getMetadata().getName();
        String path = ((FlinkStateSnapshotStatus)flinkStateSnapshot.getStatus()).getPath();
        if (path == null) {
            LOG.info("No path was saved for snapshot {}, no cleanup required.", (Object)resourceName);
            return DeleteControl.defaultDelete();
        }
        LOG.info("Disposing of savepoint of {} before deleting the resource...", (Object)resourceName);
        Configuration observeConfig = ctx.getReferencedJobObserveConfig();
        FlinkResourceContext<FlinkDeployment> ctxFlinkDeployment = this.ctxFactory.getResourceContext(flinkDeployment, ctx.getJosdkContext());
        FlinkService flinkService = ctxFlinkDeployment.getFlinkService();
        try {
            flinkService.disposeSavepoint(path, observeConfig);
            return DeleteControl.defaultDelete();
        }
        catch (Exception e) {
            LOG.error("Failed to dispose savepoint {} from deployment {}", (Object)path, (Object)ctxFlinkDeployment.getResource().getMetadata().getName());
            return DeleteControl.noFinalizerRemoval().rescheduleAfter(ctx.getOperatorConfig().getReconcileInterval().toMillis());
        }
    }

    private Optional<String> triggerCheckpointOrSavepoint(FlinkStateSnapshotSpec spec, FlinkStateSnapshotContext ctx, String jobId) throws Exception {
        FlinkResourceContext<FlinkDeployment> flinkDeploymentContext = this.ctxFactory.getResourceContext(ctx.getReferencedJobFlinkDeployment(), ctx.getJosdkContext());
        FlinkService flinkService = flinkDeploymentContext.getFlinkService();
        Configuration conf = (Configuration)Preconditions.checkNotNull((Object)flinkDeploymentContext.getObserveConfig(), (String)String.format("Observe config was null for %s", flinkDeploymentContext.getResource().getMetadata().getName()));
        if (spec.isSavepoint()) {
            String path = (String)ObjectUtils.firstNonNull((Object[])new String[]{spec.getSavepoint().getPath(), (String)conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY)});
            if (path == null) {
                throw new IllegalArgumentException(String.format("Either the savepoint path in the spec or configuration %s in the Flink resource has to be supplied", CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
            }
            return Optional.of(flinkService.triggerSavepoint(jobId, SavepointFormatType.valueOf((String)spec.getSavepoint().getFormatType().name()), path, conf));
        }
        if (spec.isCheckpoint()) {
            if (!SnapshotUtils.isSnapshotTriggeringSupported(conf)) {
                throw new IllegalArgumentException("Manual checkpoint triggering is not supported for this Flink job (requires Flink 1.17+)");
            }
            return Optional.of(flinkService.triggerCheckpoint(jobId, CheckpointType.FULL, conf));
        }
        throw new IllegalArgumentException("Snapshot must specify either savepoint or checkpoint spec");
    }

    public StateSnapshotReconciler(FlinkResourceContextFactory ctxFactory, EventRecorder eventRecorder) {
        this.ctxFactory = ctxFactory;
        this.eventRecorder = eventRecorder;
    }
}

