/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.reconciler;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.fabric8.kubernetes.client.CustomResource;
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import java.time.Duration;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.reconciler.ReconciliationMetadata;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationStatus;
import org.apache.flink.kubernetes.operator.api.status.SavepointInfo;
import org.apache.flink.kubernetes.operator.api.status.SavepointTriggerType;
import org.apache.flink.kubernetes.operator.api.status.TaskManagerInfo;
import org.apache.flink.kubernetes.operator.api.utils.SpecUtils;
import org.apache.flink.kubernetes.operator.api.utils.SpecWithMeta;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.exception.ValidationException;
import org.apache.flink.kubernetes.operator.utils.FlinkResourceExceptionUtils;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReconciliationUtils {
    private static final Logger LOG = LoggerFactory.getLogger(ReconciliationUtils.class);
    private static final ObjectMapper objectMapper = new ObjectMapper();

    public static <SPEC extends AbstractFlinkSpec> void updateStatusForDeployedSpec(AbstractFlinkResource<SPEC, ?> target, Configuration conf) {
        JobSpec job = ((AbstractFlinkSpec)target.getSpec()).getJob();
        ReconciliationUtils.updateStatusForSpecReconciliation(target, job != null ? job.getState() : null, conf, false);
    }

    public static <SPEC extends AbstractFlinkSpec> void updateStatusBeforeDeploymentAttempt(AbstractFlinkResource<SPEC, ?> target, Configuration conf) {
        ReconciliationUtils.updateStatusForSpecReconciliation(target, JobState.SUSPENDED, conf, true);
    }

    private static <SPEC extends AbstractFlinkSpec> void updateStatusForSpecReconciliation(AbstractFlinkResource<SPEC, ?> target, JobState stateAfterReconcile, Configuration conf, boolean upgrading) {
        CommonStatus status = (CommonStatus)target.getStatus();
        AbstractFlinkSpec spec = (AbstractFlinkSpec)target.getSpec();
        ReconciliationStatus reconciliationStatus = status.getReconciliationStatus();
        status.setError(null);
        reconciliationStatus.setReconciliationTimestamp(System.currentTimeMillis());
        reconciliationStatus.setState(upgrading ? ReconciliationState.UPGRADING : ReconciliationState.DEPLOYED);
        if (spec.getJob() != null) {
            AbstractFlinkSpec clonedSpec = ReconciliationUtils.clone(spec);
            JobSpec job = clonedSpec.getJob();
            job.setState(stateAfterReconcile);
            AbstractFlinkSpec lastSpec = reconciliationStatus.deserializeLastReconciledSpec();
            if (lastSpec != null) {
                job.setSavepointTriggerNonce(lastSpec.getJob().getSavepointTriggerNonce());
            }
            if (target instanceof FlinkDeployment) {
                ((FlinkDeploymentStatus)status).setTaskManager(ReconciliationUtils.getTaskManagerInfo(target.getMetadata().getName(), conf, stateAfterReconcile));
            }
            reconciliationStatus.serializeAndSetLastReconciledSpec(clonedSpec, target);
            if (spec.getJob().getState() == JobState.SUSPENDED) {
                reconciliationStatus.markReconciledSpecAsStable();
            }
        } else {
            reconciliationStatus.serializeAndSetLastReconciledSpec(spec, target);
        }
    }

    public static <SPEC extends AbstractFlinkSpec> void updateLastReconciledSavepointTriggerNonce(SavepointInfo savepointInfo, AbstractFlinkResource<SPEC, ?> target) {
        if (savepointInfo.getTriggerType() != SavepointTriggerType.MANUAL) {
            return;
        }
        CommonStatus commonStatus = (CommonStatus)target.getStatus();
        AbstractFlinkSpec spec = (AbstractFlinkSpec)target.getSpec();
        ReconciliationStatus reconciliationStatus = commonStatus.getReconciliationStatus();
        AbstractFlinkSpec lastReconciledSpec = reconciliationStatus.deserializeLastReconciledSpec();
        lastReconciledSpec.getJob().setSavepointTriggerNonce(spec.getJob().getSavepointTriggerNonce());
        reconciliationStatus.serializeAndSetLastReconciledSpec(lastReconciledSpec, target);
        reconciliationStatus.setReconciliationTimestamp(System.currentTimeMillis());
    }

    private static TaskManagerInfo getTaskManagerInfo(String name, Configuration conf, JobState jobState) {
        String labelSelector = "component=taskmanager,app=" + name;
        if (jobState == JobState.RUNNING) {
            return new TaskManagerInfo(labelSelector, FlinkUtils.getNumTaskManagers(conf));
        }
        return new TaskManagerInfo("", 0);
    }

    public static void updateForReconciliationError(AbstractFlinkResource<?, ?> target, Throwable error, FlinkOperatorConfiguration conf) {
        FlinkResourceExceptionUtils.updateFlinkResourceException(error, target, conf);
    }

    public static <T> T clone(T object) {
        return (T)SpecUtils.clone(object);
    }

    public static <SPEC extends AbstractFlinkSpec, STATUS extends CommonStatus<SPEC>, R extends CustomResource<SPEC, STATUS>> UpdateControl<R> toUpdateControl(FlinkOperatorConfiguration operatorConfiguration, R current, R previous, boolean reschedule) {
        CommonStatus status = (CommonStatus)current.getStatus();
        UpdateControl updateControl = UpdateControl.noUpdate();
        if (!reschedule) {
            return updateControl;
        }
        if (ReconciliationUtils.upgradeStarted(status.getReconciliationStatus().getState(), ((CommonStatus)previous.getStatus()).getReconciliationStatus().getState())) {
            return (UpdateControl)updateControl.rescheduleAfter(0L);
        }
        if (status instanceof FlinkDeploymentStatus) {
            return (UpdateControl)updateControl.rescheduleAfter(ReconciliationUtils.rescheduleAfter(((FlinkDeploymentStatus)status).getJobManagerDeploymentStatus(), (FlinkDeployment)current, operatorConfiguration).toMillis());
        }
        return (UpdateControl)updateControl.rescheduleAfter(operatorConfiguration.getReconcileInterval().toMillis());
    }

    public static Duration rescheduleAfter(JobManagerDeploymentStatus status, FlinkDeployment flinkDeployment, FlinkOperatorConfiguration operatorConfiguration) {
        Duration rescheduleAfter;
        switch (status) {
            case DEPLOYING: {
                rescheduleAfter = operatorConfiguration.getProgressCheckInterval();
                break;
            }
            case READY: {
                rescheduleAfter = ReconciliationUtils.savepointInProgress(((FlinkDeploymentStatus)flinkDeployment.getStatus()).getJobStatus()) ? operatorConfiguration.getProgressCheckInterval() : operatorConfiguration.getReconcileInterval();
                break;
            }
            case MISSING: 
            case ERROR: {
                rescheduleAfter = operatorConfiguration.getReconcileInterval();
                break;
            }
            case DEPLOYED_NOT_READY: {
                rescheduleAfter = operatorConfiguration.getRestApiReadyDelay();
                break;
            }
            default: {
                throw new RuntimeException("Unknown status: " + status);
            }
        }
        return rescheduleAfter;
    }

    private static boolean savepointInProgress(org.apache.flink.kubernetes.operator.api.status.JobStatus jobStatus) {
        return StringUtils.isNotEmpty((CharSequence)jobStatus.getSavepointInfo().getTriggerId());
    }

    public static boolean isUpgradeModeChangedToLastStateAndHADisabledPreviously(AbstractFlinkResource<?, ?> flinkApp, Configuration observeConfig) {
        Object deployedSpec = ReconciliationUtils.getDeployedSpec(flinkApp);
        UpgradeMode previousUpgradeMode = deployedSpec.getJob().getUpgradeMode();
        UpgradeMode currentUpgradeMode = ((AbstractFlinkSpec)flinkApp.getSpec()).getJob().getUpgradeMode();
        return previousUpgradeMode != UpgradeMode.LAST_STATE && currentUpgradeMode == UpgradeMode.LAST_STATE && !FlinkUtils.isKubernetesHAActivated(observeConfig);
    }

    public static <SPEC extends AbstractFlinkSpec> SPEC getDeployedSpec(AbstractFlinkResource<SPEC, ?> deployment) {
        ReconciliationStatus reconciliationStatus = ((CommonStatus)deployment.getStatus()).getReconciliationStatus();
        ReconciliationState reconciliationState = reconciliationStatus.getState();
        if (reconciliationState != ReconciliationState.ROLLED_BACK) {
            return (SPEC)reconciliationStatus.deserializeLastReconciledSpec();
        }
        return (SPEC)reconciliationStatus.deserializeLastStableSpec();
    }

    private static boolean upgradeStarted(ReconciliationState currentReconState, ReconciliationState previousReconState) {
        if (currentReconState == previousReconState) {
            return false;
        }
        return currentReconState == ReconciliationState.ROLLING_BACK || currentReconState == ReconciliationState.UPGRADING;
    }

    public static boolean isJobInTerminalState(CommonStatus<?> status) {
        String jobState = status.getJobStatus().getState();
        return JobStatus.valueOf((String)jobState).isGloballyTerminalState();
    }

    public static boolean isJobRunning(CommonStatus<?> status) {
        return JobStatus.RUNNING.name().equals(status.getJobStatus().getState());
    }

    public static <SPEC extends AbstractFlinkSpec> boolean applyValidationErrorAndResetSpec(AbstractFlinkResource<SPEC, ?> deployment, String validationError, FlinkOperatorConfiguration conf) {
        AbstractFlinkSpec lastReconciledSpec;
        CommonStatus status = (CommonStatus)deployment.getStatus();
        if (!validationError.equals(status.getError())) {
            LOG.error("Validation failed: " + validationError);
            ReconciliationUtils.updateForReconciliationError(deployment, new ValidationException(validationError), conf);
        }
        if ((lastReconciledSpec = status.getReconciliationStatus().deserializeLastReconciledSpec()) == null) {
            return false;
        }
        deployment.setSpec((Object)lastReconciledSpec);
        if (status.getReconciliationStatus().getState() == ReconciliationState.UPGRADING) {
            ((AbstractFlinkSpec)deployment.getSpec()).getJob().setState(JobState.RUNNING);
        }
        return true;
    }

    public static <STATUS extends CommonStatus<?>, R extends AbstractFlinkResource<?, STATUS>> ErrorStatusUpdateControl<R> toErrorStatusUpdateControl(R resource, Optional<RetryInfo> retryInfo, Exception e, StatusRecorder<R, STATUS> statusRecorder, FlinkOperatorConfiguration operatorConfiguration) {
        retryInfo.ifPresent(r -> LOG.warn("Attempt count: {}, last attempt: {}", (Object)r.getAttemptCount(), (Object)r.isLastAttempt()));
        statusRecorder.updateStatusFromCache(resource);
        ReconciliationUtils.updateForReconciliationError(resource, e, operatorConfiguration);
        statusRecorder.patchAndCacheStatus(resource);
        return ErrorStatusUpdateControl.noStatusUpdate();
    }

    public static Long getUpgradeTargetGeneration(AbstractFlinkResource<?, ?> resource) {
        SpecWithMeta lastSpecWithMeta = ((CommonStatus)resource.getStatus()).getReconciliationStatus().deserializeLastReconciledSpecWithMeta();
        if (lastSpecWithMeta.getMeta() == null) {
            return -1L;
        }
        return lastSpecWithMeta.getMeta().getMetadata().getGeneration();
    }

    public static void clearLastReconciledSpecIfFirstDeploy(AbstractFlinkResource<?, ?> resource) {
        ReconciliationStatus reconStatus = ((CommonStatus)resource.getStatus()).getReconciliationStatus();
        SpecWithMeta lastSpecWithMeta = reconStatus.deserializeLastReconciledSpecWithMeta();
        if (lastSpecWithMeta.getMeta() == null) {
            return;
        }
        if (lastSpecWithMeta.getMeta().isFirstDeployment()) {
            reconStatus.setLastReconciledSpec(null);
            reconStatus.setState(ReconciliationState.UPGRADING);
        }
    }

    public static void checkAndUpdateStableSpec(CommonStatus<?> status) {
        JobStatus flinkJobStatus = JobStatus.valueOf((String)status.getJobStatus().getState());
        if (status.getReconciliationStatus().getState() != ReconciliationState.DEPLOYED) {
            return;
        }
        if (flinkJobStatus == JobStatus.RUNNING) {
            status.getReconciliationStatus().markReconciledSpecAsStable();
            return;
        }
        JobState reconciledJobState = status.getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState();
        if (reconciledJobState == JobState.RUNNING && flinkJobStatus == JobStatus.FINISHED) {
            status.getReconciliationStatus().markReconciledSpecAsStable();
        }
    }

    public static <SPEC extends AbstractFlinkSpec> void updateStatusForAlreadyUpgraded(AbstractFlinkResource<SPEC, ?> resource) {
        ReconciliationStatus reconciliationStatus = ((CommonStatus)resource.getStatus()).getReconciliationStatus();
        SpecWithMeta lastSpecWithMeta = reconciliationStatus.deserializeLastReconciledSpecWithMeta();
        JobSpec lastJobSpec = lastSpecWithMeta.getSpec().getJob();
        if (lastJobSpec != null) {
            lastJobSpec.setState(JobState.RUNNING);
        }
        reconciliationStatus.setState(ReconciliationState.DEPLOYED);
        reconciliationStatus.setLastReconciledSpec(SpecUtils.writeSpecWithMeta((AbstractFlinkSpec)lastSpecWithMeta.getSpec(), (ReconciliationMetadata)lastSpecWithMeta.getMeta()));
    }
}

