/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.utils;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.flink.autoscaler.utils.DateTimeUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
import org.apache.flink.kubernetes.operator.api.spec.CheckpointSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobReference;
import org.apache.flink.kubernetes.operator.api.spec.SavepointSpec;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;
import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType;
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.SnapshotType;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;

public class FlinkStateSnapshotUtils {
    protected static FlinkStateSnapshot createFlinkStateSnapshot(KubernetesClient kubernetesClient, String namespace, String name, FlinkStateSnapshotSpec spec, SnapshotTriggerType triggerType) {
        ObjectMeta metadata = new ObjectMeta();
        metadata.setNamespace(namespace);
        metadata.setName(name);
        metadata.getLabels().put("snapshot.trigger-type", triggerType.name());
        FlinkStateSnapshot snapshot = new FlinkStateSnapshot();
        snapshot.setSpec((Object)spec);
        snapshot.setMetadata(metadata);
        return (FlinkStateSnapshot)kubernetesClient.resource((HasMetadata)snapshot).create();
    }

    public static SnapshotTriggerType getSnapshotTriggerType(FlinkStateSnapshot snapshot) {
        String triggerTypeStr = (String)snapshot.getMetadata().getLabels().get("snapshot.trigger-type");
        try {
            return SnapshotTriggerType.valueOf((String)triggerTypeStr);
        }
        catch (IllegalArgumentException | NullPointerException e) {
            return SnapshotTriggerType.UNKNOWN;
        }
    }

    public static FlinkStateSnapshot createSavepointResource(KubernetesClient kubernetesClient, AbstractFlinkResource<?, ?> resource, @Nullable String savepointPath, SnapshotTriggerType triggerType, SavepointFormatType savepointFormatType, boolean disposeOnDelete) {
        SavepointSpec savepointSpec = SavepointSpec.builder().path(savepointPath).formatType(savepointFormatType).disposeOnDelete(Boolean.valueOf(disposeOnDelete)).alreadyExists(Boolean.valueOf(triggerType == SnapshotTriggerType.UPGRADE)).build();
        FlinkStateSnapshotSpec snapshotSpec = FlinkStateSnapshotSpec.builder().jobReference(JobReference.fromFlinkResource(resource)).savepoint(savepointSpec).build();
        String resourceName = FlinkStateSnapshotUtils.getFlinkStateSnapshotName(SnapshotType.SAVEPOINT, triggerType, resource);
        return FlinkStateSnapshotUtils.createFlinkStateSnapshot(kubernetesClient, resource.getMetadata().getNamespace(), resourceName, snapshotSpec, triggerType);
    }

    public static FlinkStateSnapshot createCheckpointResource(KubernetesClient kubernetesClient, AbstractFlinkResource<?, ?> resource, SnapshotTriggerType triggerType) {
        FlinkStateSnapshotSpec snapshotSpec = FlinkStateSnapshotSpec.builder().jobReference(JobReference.fromFlinkResource(resource)).checkpoint(new CheckpointSpec()).build();
        String resourceName = FlinkStateSnapshotUtils.getFlinkStateSnapshotName(SnapshotType.CHECKPOINT, triggerType, resource);
        return FlinkStateSnapshotUtils.createFlinkStateSnapshot(kubernetesClient, resource.getMetadata().getNamespace(), resourceName, snapshotSpec, triggerType);
    }

    public static boolean isSnapshotResourceEnabled(FlinkOperatorConfiguration operatorConfiguration, Configuration configuration) {
        return (Boolean)configuration.get(KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED) != false && operatorConfiguration.isSnapshotResourcesEnabled();
    }

    public static String getFlinkStateSnapshotName(SnapshotType snapshotType, SnapshotTriggerType triggerType, AbstractFlinkResource<?, ?> referencedResource) {
        return String.format("%s-%s-%s-%d", referencedResource.getMetadata().getName(), snapshotType.name().toLowerCase(), triggerType.name().toLowerCase(), System.currentTimeMillis());
    }

    public static Optional<FlinkStateSnapshot> createUpgradeSnapshotResource(Configuration conf, FlinkOperatorConfiguration operatorConf, KubernetesClient kubernetesClient, AbstractFlinkResource<?, ?> flinkResource, SavepointFormatType savepointFormatType, String savepointPath) {
        if (!FlinkStateSnapshotUtils.isSnapshotResourceEnabled(operatorConf, conf)) {
            return Optional.empty();
        }
        Boolean disposeOnDelete = (Boolean)conf.get(KubernetesOperatorConfigOptions.OPERATOR_JOB_SAVEPOINT_DISPOSE_ON_DELETE);
        FlinkStateSnapshot savepointResource = FlinkStateSnapshotUtils.createSavepointResource(kubernetesClient, flinkResource, savepointPath, SnapshotTriggerType.UPGRADE, savepointFormatType, disposeOnDelete);
        return Optional.of(savepointResource);
    }

    public static Supplier<Set<FlinkStateSnapshot>> getFlinkStateSnapshotsSupplier(FlinkResourceContext<?> ctx) {
        return () -> {
            if (FlinkStateSnapshotUtils.isSnapshotResourceEnabled(ctx.getOperatorConfig(), ctx.getObserveConfig())) {
                return (Set)ObjectUtils.firstNonNull((Object[])new Set[]{ctx.getJosdkContext().getSecondaryResources(FlinkStateSnapshot.class), Set.of()});
            }
            return Set.of();
        };
    }

    public static boolean abandonSnapshotIfJobNotRunning(KubernetesClient client, FlinkStateSnapshot snapshot, @Nullable AbstractFlinkResource<?, ?> secondaryResource, EventRecorder eventRecorder) {
        if (secondaryResource == null) {
            String message = String.format("Secondary resource %s for savepoint %s was not found", ((FlinkStateSnapshotSpec)snapshot.getSpec()).getJobReference(), snapshot.getMetadata().getName());
            FlinkStateSnapshotUtils.snapshotAbandoned(client, snapshot, eventRecorder, message);
            return true;
        }
        if (!ReconciliationUtils.isJobRunning((CommonStatus)secondaryResource.getStatus())) {
            String message = String.format("Secondary resource %s for savepoint %s is not running", ((FlinkStateSnapshotSpec)snapshot.getSpec()).getJobReference(), snapshot.getMetadata().getName());
            FlinkStateSnapshotUtils.snapshotAbandoned(client, snapshot, eventRecorder, message);
            return true;
        }
        return false;
    }

    private static void snapshotAbandoned(KubernetesClient kubernetesClient, FlinkStateSnapshot snapshot, EventRecorder eventRecorder, String error) {
        eventRecorder.triggerSnapshotEvent(snapshot, EventRecorder.Type.Warning, EventRecorder.Reason.SnapshotAbandoned, EventRecorder.Component.Snapshot, error, kubernetesClient);
        ((FlinkStateSnapshotStatus)snapshot.getStatus()).setState(FlinkStateSnapshotStatus.State.ABANDONED);
        ((FlinkStateSnapshotStatus)snapshot.getStatus()).setPath(null);
        ((FlinkStateSnapshotStatus)snapshot.getStatus()).setError(error);
        ((FlinkStateSnapshotStatus)snapshot.getStatus()).setResultTimestamp(DateTimeUtils.kubernetes((Instant)Instant.now()));
    }

    public static void snapshotSuccessful(FlinkStateSnapshot snapshot, String location, boolean setTriggerTimestamp) {
        String time = DateTimeUtils.kubernetes((Instant)Instant.now());
        ((FlinkStateSnapshotStatus)snapshot.getStatus()).setState(FlinkStateSnapshotStatus.State.COMPLETED);
        ((FlinkStateSnapshotStatus)snapshot.getStatus()).setPath(location);
        ((FlinkStateSnapshotStatus)snapshot.getStatus()).setError(null);
        ((FlinkStateSnapshotStatus)snapshot.getStatus()).setResultTimestamp(time);
        if (setTriggerTimestamp) {
            ((FlinkStateSnapshotStatus)snapshot.getStatus()).setTriggerTimestamp(time);
        }
    }

    public static void snapshotInProgress(FlinkStateSnapshot snapshot, String triggerId) {
        snapshot.getMetadata().getLabels().putIfAbsent("snapshot.trigger-type", SnapshotTriggerType.MANUAL.name());
        ((FlinkStateSnapshotStatus)snapshot.getStatus()).setState(FlinkStateSnapshotStatus.State.IN_PROGRESS);
        ((FlinkStateSnapshotStatus)snapshot.getStatus()).setTriggerId(triggerId);
        ((FlinkStateSnapshotStatus)snapshot.getStatus()).setTriggerTimestamp(DateTimeUtils.kubernetes((Instant)Instant.now()));
    }

    public static void snapshotTriggerPending(FlinkStateSnapshot snapshot) {
        ((FlinkStateSnapshotStatus)snapshot.getStatus()).setState(FlinkStateSnapshotStatus.State.TRIGGER_PENDING);
    }

    public static Map<String, String> getSnapshotLabels(FlinkStateSnapshot snapshot, Optional<AbstractFlinkResource<?, ?>> secondaryResourceOpt) {
        HashMap<String, String> labels = new HashMap<String, String>();
        labels.put("snapshot.type", ((FlinkStateSnapshotSpec)snapshot.getSpec()).isSavepoint() ? SnapshotType.SAVEPOINT.name() : SnapshotType.CHECKPOINT.name());
        labels.put("snapshot.trigger-type", snapshot.getMetadata().getLabels().getOrDefault("snapshot.trigger-type", SnapshotTriggerType.MANUAL.name()));
        Optional.ofNullable((FlinkStateSnapshotStatus)snapshot.getStatus()).ifPresent(status -> labels.put("snapshot.state", status.getState().name()));
        secondaryResourceOpt.ifPresent(secondaryResource -> {
            labels.put("job-reference.kind", secondaryResource.getKind());
            labels.put("job-reference.name", secondaryResource.getMetadata().getName());
        });
        return labels;
    }

    public static ResourceID getSnapshotJobReferenceResourceId(FlinkStateSnapshot snapshot) {
        return new ResourceID(((FlinkStateSnapshotSpec)snapshot.getSpec()).getJobReference().getName(), snapshot.getMetadata().getNamespace());
    }
}

