/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.reconciler;

import io.fabric8.kubernetes.client.CustomResource;
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import java.time.Clock;
import java.time.Duration;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.reconciler.ReconciliationMetadata;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationStatus;
import org.apache.flink.kubernetes.operator.api.status.SnapshotInfo;
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
import org.apache.flink.kubernetes.operator.api.status.TaskManagerInfo;
import org.apache.flink.kubernetes.operator.api.utils.SpecUtils;
import org.apache.flink.kubernetes.operator.api.utils.SpecWithMeta;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.exception.ValidationException;
import org.apache.flink.kubernetes.operator.reconciler.SnapshotType;
import org.apache.flink.kubernetes.operator.utils.FlinkResourceExceptionUtils;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReconciliationUtils {
    private static final Logger LOG = LoggerFactory.getLogger(ReconciliationUtils.class);

    public static <SPEC extends AbstractFlinkSpec> void updateStatusForDeployedSpec(AbstractFlinkResource<SPEC, ?> target, Configuration conf, Clock clock) {
        JobSpec job = ((AbstractFlinkSpec)target.getSpec()).getJob();
        ReconciliationUtils.updateStatusForSpecReconciliation(target, job != null ? job.getState() : null, conf, false, clock);
    }

    @VisibleForTesting
    public static <SPEC extends AbstractFlinkSpec> void updateStatusForDeployedSpec(AbstractFlinkResource<SPEC, ?> target, Configuration conf) {
        ReconciliationUtils.updateStatusForDeployedSpec(target, conf, Clock.systemDefaultZone());
    }

    public static <SPEC extends AbstractFlinkSpec> void updateStatusBeforeDeploymentAttempt(AbstractFlinkResource<SPEC, ?> target, Configuration conf, Clock clock) {
        ReconciliationUtils.updateStatusForSpecReconciliation(target, JobState.SUSPENDED, conf, true, clock);
    }

    @VisibleForTesting
    public static <SPEC extends AbstractFlinkSpec> void updateStatusBeforeDeploymentAttempt(AbstractFlinkResource<SPEC, ?> target, Configuration conf) {
        ReconciliationUtils.updateStatusBeforeDeploymentAttempt(target, conf, Clock.systemDefaultZone());
    }

    @VisibleForTesting
    public static <SPEC extends AbstractFlinkSpec> void updateStatusForSpecReconciliation(AbstractFlinkResource<SPEC, ?> target, JobState stateAfterReconcile, Configuration conf, boolean upgrading, Clock clock) {
        CommonStatus status = (CommonStatus)target.getStatus();
        AbstractFlinkSpec spec = (AbstractFlinkSpec)target.getSpec();
        ReconciliationStatus reconciliationStatus = status.getReconciliationStatus();
        status.setError(null);
        reconciliationStatus.setReconciliationTimestamp(clock.instant().toEpochMilli());
        ReconciliationState state = reconciliationStatus.getState();
        state = state == ReconciliationState.ROLLING_BACK ? (upgrading ? ReconciliationState.ROLLING_BACK : ReconciliationState.ROLLED_BACK) : (upgrading ? ReconciliationState.UPGRADING : ReconciliationState.DEPLOYED);
        reconciliationStatus.setState(state);
        if (state == ReconciliationState.ROLLING_BACK || state == ReconciliationState.ROLLED_BACK) {
            SpecWithMeta lastSpecWithMeta = reconciliationStatus.deserializeLastReconciledSpecWithMeta();
            JobSpec job = lastSpecWithMeta.getSpec().getJob();
            if (job != null) {
                job.setUpgradeMode(spec.getJob().getUpgradeMode());
                reconciliationStatus.setLastReconciledSpec(SpecUtils.writeSpecWithMeta((AbstractFlinkSpec)lastSpecWithMeta.getSpec(), (ReconciliationMetadata)lastSpecWithMeta.getMeta()));
            }
        } else {
            AbstractFlinkSpec clonedSpec = ReconciliationUtils.clone(spec);
            if (spec.getJob() != null) {
                JobSpec job = clonedSpec.getJob();
                job.setState(stateAfterReconcile);
                AbstractFlinkSpec lastSpec = reconciliationStatus.deserializeLastReconciledSpec();
                if (lastSpec != null) {
                    job.setSavepointTriggerNonce(lastSpec.getJob().getSavepointTriggerNonce());
                    job.setCheckpointTriggerNonce(lastSpec.getJob().getCheckpointTriggerNonce());
                }
                if (target instanceof FlinkDeployment) {
                    ((FlinkDeploymentStatus)status).setTaskManager(ReconciliationUtils.getTaskManagerInfo(target.getMetadata().getName(), conf, stateAfterReconcile));
                }
                reconciliationStatus.serializeAndSetLastReconciledSpec(clonedSpec, target);
                if (spec.getJob().getState() == JobState.SUSPENDED) {
                    reconciliationStatus.markReconciledSpecAsStable();
                }
            } else {
                reconciliationStatus.serializeAndSetLastReconciledSpec(clonedSpec, target);
            }
        }
    }

    public static <SPEC extends AbstractFlinkSpec> void updateLastReconciledSnapshotTriggerNonce(SnapshotInfo snapshotInfo, AbstractFlinkResource<SPEC, ?> target, SnapshotType snapshotType) {
        if (snapshotInfo.getTriggerType() != SnapshotTriggerType.MANUAL) {
            return;
        }
        CommonStatus commonStatus = (CommonStatus)target.getStatus();
        AbstractFlinkSpec spec = (AbstractFlinkSpec)target.getSpec();
        ReconciliationStatus reconciliationStatus = commonStatus.getReconciliationStatus();
        AbstractFlinkSpec lastReconciledSpec = reconciliationStatus.deserializeLastReconciledSpec();
        JobSpec lastReconciledJobSpec = lastReconciledSpec.getJob();
        JobSpec jobSpec = spec.getJob();
        ReconciliationUtils.updateLastReconciledJobSpec(lastReconciledJobSpec, jobSpec, snapshotType);
        reconciliationStatus.serializeAndSetLastReconciledSpec(lastReconciledSpec, target);
        reconciliationStatus.setReconciliationTimestamp(System.currentTimeMillis());
    }

    private static void updateLastReconciledJobSpec(JobSpec lastReconciledJobSpec, JobSpec jobSpec, SnapshotType snapshotType) {
        switch (snapshotType) {
            case SAVEPOINT: {
                lastReconciledJobSpec.setSavepointTriggerNonce(jobSpec.getSavepointTriggerNonce());
                break;
            }
            case CHECKPOINT: {
                lastReconciledJobSpec.setCheckpointTriggerNonce(jobSpec.getCheckpointTriggerNonce());
                break;
            }
            default: {
                throw new IllegalArgumentException("Unsupported snapshot type: " + snapshotType);
            }
        }
    }

    private static TaskManagerInfo getTaskManagerInfo(String name, Configuration conf, JobState jobState) {
        String labelSelector = "component=taskmanager,app=" + name;
        if (jobState == JobState.RUNNING) {
            return new TaskManagerInfo(labelSelector, FlinkUtils.getNumTaskManagers(conf));
        }
        return new TaskManagerInfo("", 0);
    }

    public static void updateForReconciliationError(FlinkResourceContext ctx, Throwable error) {
        FlinkResourceExceptionUtils.updateFlinkResourceException(error, ctx.getResource(), ctx.getOperatorConfig());
    }

    public static <T> T clone(T object) {
        return (T)SpecUtils.clone(object);
    }

    public static <SPEC extends AbstractFlinkSpec, STATUS extends CommonStatus<SPEC>, R extends CustomResource<SPEC, STATUS>> UpdateControl<R> toUpdateControl(FlinkOperatorConfiguration operatorConfiguration, R current, R previous, boolean reschedule) {
        CommonStatus status = (CommonStatus)current.getStatus();
        UpdateControl updateControl = UpdateControl.noUpdate();
        if (!reschedule) {
            return updateControl;
        }
        if (ReconciliationUtils.upgradeStarted(status.getReconciliationStatus(), ((CommonStatus)previous.getStatus()).getReconciliationStatus()) || ((CommonStatus)current.getStatus()).isImmediateReconciliationNeeded()) {
            return (UpdateControl)updateControl.rescheduleAfter(0L);
        }
        if (status instanceof FlinkDeploymentStatus) {
            return (UpdateControl)updateControl.rescheduleAfter(ReconciliationUtils.rescheduleAfter(((FlinkDeploymentStatus)status).getJobManagerDeploymentStatus(), (FlinkDeployment)current, operatorConfiguration).toMillis());
        }
        return (UpdateControl)updateControl.rescheduleAfter(operatorConfiguration.getReconcileInterval().toMillis());
    }

    public static Duration rescheduleAfter(JobManagerDeploymentStatus status, FlinkDeployment flinkDeployment, FlinkOperatorConfiguration operatorConfiguration) {
        Duration rescheduleAfter;
        switch (status) {
            case DEPLOYING: {
                rescheduleAfter = operatorConfiguration.getProgressCheckInterval();
                break;
            }
            case READY: {
                rescheduleAfter = ReconciliationUtils.savepointInProgress(((FlinkDeploymentStatus)flinkDeployment.getStatus()).getJobStatus()) ? operatorConfiguration.getProgressCheckInterval() : operatorConfiguration.getReconcileInterval();
                break;
            }
            case MISSING: 
            case ERROR: {
                rescheduleAfter = operatorConfiguration.getReconcileInterval();
                break;
            }
            case DEPLOYED_NOT_READY: {
                rescheduleAfter = operatorConfiguration.getRestApiReadyDelay();
                break;
            }
            default: {
                throw new RuntimeException("Unknown status: " + status);
            }
        }
        return rescheduleAfter;
    }

    private static boolean savepointInProgress(org.apache.flink.kubernetes.operator.api.status.JobStatus jobStatus) {
        return StringUtils.isNotEmpty((CharSequence)jobStatus.getSavepointInfo().getTriggerId());
    }

    public static boolean isUpgradeModeChangedToLastStateAndHADisabledPreviously(AbstractFlinkResource<?, ?> flinkApp, Configuration observeConfig) {
        Object deployedSpec = ReconciliationUtils.getDeployedSpec(flinkApp);
        UpgradeMode previousUpgradeMode = deployedSpec.getJob().getUpgradeMode();
        UpgradeMode currentUpgradeMode = ((AbstractFlinkSpec)flinkApp.getSpec()).getJob().getUpgradeMode();
        return previousUpgradeMode != UpgradeMode.LAST_STATE && currentUpgradeMode == UpgradeMode.LAST_STATE && !HighAvailabilityMode.isHighAvailabilityModeActivated((Configuration)observeConfig);
    }

    public static <SPEC extends AbstractFlinkSpec> SPEC getDeployedSpec(AbstractFlinkResource<SPEC, ?> deployment) {
        ReconciliationStatus reconciliationStatus = ((CommonStatus)deployment.getStatus()).getReconciliationStatus();
        ReconciliationState reconciliationState = reconciliationStatus.getState();
        if (reconciliationState != ReconciliationState.ROLLED_BACK) {
            return (SPEC)reconciliationStatus.deserializeLastReconciledSpec();
        }
        return (SPEC)reconciliationStatus.deserializeLastStableSpec();
    }

    private static boolean upgradeStarted(ReconciliationStatus<?> currentStatus, ReconciliationStatus<?> previousStatus) {
        ReconciliationState previousReconState;
        ReconciliationState currentReconState = currentStatus.getState();
        if (currentReconState == (previousReconState = previousStatus.getState())) {
            return false;
        }
        if (currentStatus.scalingInProgress()) {
            return false;
        }
        return currentReconState == ReconciliationState.ROLLING_BACK || currentReconState == ReconciliationState.UPGRADING;
    }

    public static boolean isJobInTerminalState(CommonStatus<?> status) {
        String jobState = status.getJobStatus().getState();
        return JobStatus.valueOf((String)jobState).isGloballyTerminalState();
    }

    public static boolean isJobRunning(CommonStatus<?> status) {
        return JobStatus.RUNNING.name().equals(status.getJobStatus().getState());
    }

    public static <SPEC extends AbstractFlinkSpec> boolean applyValidationErrorAndResetSpec(FlinkResourceContext<? extends AbstractFlinkResource<SPEC, ?>> ctx, String validationError) {
        SpecWithMeta lastReconciledSpecWithMeta;
        AbstractFlinkResource<SPEC, ?> deployment = ctx.getResource();
        CommonStatus status = (CommonStatus)deployment.getStatus();
        if (!validationError.equals(status.getError())) {
            LOG.error("Validation failed: " + validationError);
            ReconciliationUtils.updateForReconciliationError(ctx, new ValidationException(validationError));
        }
        if ((lastReconciledSpecWithMeta = status.getReconciliationStatus().deserializeLastReconciledSpecWithMeta()) == null) {
            return false;
        }
        deployment.setSpec((Object)lastReconciledSpecWithMeta.getSpec());
        if (status.getReconciliationStatus().getState() == ReconciliationState.UPGRADING || status.getReconciliationStatus().getState() == ReconciliationState.ROLLING_BACK) {
            ((AbstractFlinkSpec)deployment.getSpec()).getJob().setState(JobState.RUNNING);
        }
        deployment.getMetadata().setGeneration(status.getObservedGeneration());
        return true;
    }

    public static <STATUS extends CommonStatus<?>, R extends AbstractFlinkResource<?, STATUS>> ErrorStatusUpdateControl<R> toErrorStatusUpdateControl(FlinkResourceContext<R> ctx, Exception e, StatusRecorder<R, STATUS> statusRecorder) {
        Optional retryInfo = ctx.getJosdkContext().getRetryInfo();
        retryInfo.ifPresent(r -> LOG.warn("Attempt count: {}, last attempt: {}", (Object)r.getAttemptCount(), (Object)r.isLastAttempt()));
        statusRecorder.updateStatusFromCache(ctx.getResource());
        ReconciliationUtils.updateForReconciliationError(ctx, e);
        statusRecorder.patchAndCacheStatus(ctx.getResource(), ctx.getKubernetesClient());
        return ErrorStatusUpdateControl.noStatusUpdate();
    }

    public static Long getUpgradeTargetGeneration(AbstractFlinkResource<?, ?> resource) {
        Long observedGeneration = ((CommonStatus)resource.getStatus()).getObservedGeneration();
        return observedGeneration == null ? -1L : observedGeneration;
    }

    public static void clearLastReconciledSpecIfFirstDeploy(AbstractFlinkResource<?, ?> resource) {
        ReconciliationStatus reconStatus = ((CommonStatus)resource.getStatus()).getReconciliationStatus();
        SpecWithMeta lastSpecWithMeta = reconStatus.deserializeLastReconciledSpecWithMeta();
        if (lastSpecWithMeta.getMeta() == null) {
            return;
        }
        if (lastSpecWithMeta.getMeta().isFirstDeployment()) {
            reconStatus.setLastReconciledSpec(null);
            reconStatus.setState(ReconciliationState.UPGRADING);
        }
    }

    public static void checkAndUpdateStableSpec(CommonStatus<?> status) {
        JobStatus flinkJobStatus = JobStatus.valueOf((String)status.getJobStatus().getState());
        if (status.getReconciliationStatus().getState() != ReconciliationState.DEPLOYED) {
            return;
        }
        if (flinkJobStatus == JobStatus.RUNNING) {
            status.getReconciliationStatus().markReconciledSpecAsStable();
            return;
        }
        JobState reconciledJobState = status.getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState();
        if (reconciledJobState == JobState.RUNNING && flinkJobStatus == JobStatus.FINISHED) {
            status.getReconciliationStatus().markReconciledSpecAsStable();
        }
    }

    public static void updateStatusForAlreadyUpgraded(AbstractFlinkResource<?, ?> resource) {
        CommonStatus status = (CommonStatus)resource.getStatus();
        ReconciliationStatus reconciliationStatus = status.getReconciliationStatus();
        SpecWithMeta lastSpecWithMeta = reconciliationStatus.deserializeLastReconciledSpecWithMeta();
        JobSpec lastJobSpec = lastSpecWithMeta.getSpec().getJob();
        if (lastJobSpec != null) {
            lastJobSpec.setState(JobState.RUNNING);
            status.getJobStatus().setState(JobStatus.RECONCILING.name());
        }
        reconciliationStatus.setState(ReconciliationState.DEPLOYED);
        reconciliationStatus.setLastReconciledSpec(SpecUtils.writeSpecWithMeta((AbstractFlinkSpec)lastSpecWithMeta.getSpec(), (ReconciliationMetadata)lastSpecWithMeta.getMeta()));
    }

    public static <SPEC extends AbstractFlinkSpec> void updateReconciliationMetadata(AbstractFlinkResource<SPEC, ?> resource) {
        ReconciliationStatus reconciliationStatus = ((CommonStatus)resource.getStatus()).getReconciliationStatus();
        SpecWithMeta lastSpecWithMeta = reconciliationStatus.deserializeLastReconciledSpecWithMeta();
        ReconciliationMetadata newMeta = ReconciliationMetadata.from(resource);
        if (newMeta.equals((Object)lastSpecWithMeta.getMeta())) {
            return;
        }
        reconciliationStatus.setLastReconciledSpec(SpecUtils.writeSpecWithMeta((AbstractFlinkSpec)lastSpecWithMeta.getSpec(), (ReconciliationMetadata)newMeta));
        ((CommonStatus)resource.getStatus()).setObservedGeneration(resource.getMetadata().getGeneration());
    }
}

