/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.reconciler.deployment;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.autoscaler.JobAutoScaler;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.exception.UpgradeFailureException;
import org.apache.flink.kubernetes.operator.health.ClusterHealthInfo;
import org.apache.flink.kubernetes.operator.observer.ClusterHealthEvaluator;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.service.SuspendMode;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ApplicationReconciler
extends AbstractJobReconciler<FlinkDeployment, FlinkDeploymentSpec, FlinkDeploymentStatus> {
    private static final Logger LOG = LoggerFactory.getLogger(ApplicationReconciler.class);
    static final String MSG_RECOVERY = "Recovering lost deployment";
    static final String MSG_RESTART_UNHEALTHY = "Restarting unhealthy job";

    public ApplicationReconciler(EventRecorder eventRecorder, StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder, JobAutoScaler<ResourceID, KubernetesJobAutoScalerContext> autoscaler) {
        super(eventRecorder, statusRecorder, autoscaler);
    }

    @Override
    protected AbstractJobReconciler.JobUpgrade getJobUpgrade(FlinkResourceContext<FlinkDeployment> ctx, Configuration deployConfig) throws Exception {
        FlinkDeployment deployment = ctx.getResource();
        FlinkDeploymentStatus status = (FlinkDeploymentStatus)deployment.getStatus();
        AbstractJobReconciler.JobUpgrade availableUpgradeMode = super.getJobUpgrade(ctx, deployConfig);
        if (availableUpgradeMode.isAvailable() || !availableUpgradeMode.isAllowFallback()) {
            return availableUpgradeMode;
        }
        FlinkService flinkService = ctx.getFlinkService();
        if (HighAvailabilityMode.isHighAvailabilityModeActivated((Configuration)deployConfig) && HighAvailabilityMode.isHighAvailabilityModeActivated((Configuration)ctx.getObserveConfig()) && flinkService.isHaMetadataAvailable(deployConfig)) {
            LOG.info("Job is not running but HA metadata is available for last state restore, ready for upgrade");
            return AbstractJobReconciler.JobUpgrade.lastStateUsingHaMeta();
        }
        JobManagerDeploymentStatus jmDeployStatus = status.getJobManagerDeploymentStatus();
        if (jmDeployStatus != JobManagerDeploymentStatus.MISSING && ((FlinkDeploymentSpec)status.getReconciliationStatus().deserializeLastReconciledSpec()).getJob().getUpgradeMode() != UpgradeMode.LAST_STATE && FlinkUtils.jmPodNeverStarted(ctx.getJosdkContext())) {
            this.deleteJmThatNeverStarted(flinkService, deployment, deployConfig);
            return this.getJobUpgrade(ctx, deployConfig);
        }
        if (!(jmDeployStatus != JobManagerDeploymentStatus.MISSING && jmDeployStatus != JobManagerDeploymentStatus.ERROR || flinkService.isHaMetadataAvailable(deployConfig))) {
            throw new UpgradeFailureException("JobManager deployment is missing and HA data is not available to make stateful upgrades. It is possible that the job has finished or terminally failed, or the configmaps have been deleted. Manual restore required.", "UpgradeFailed");
        }
        return AbstractJobReconciler.JobUpgrade.unavailable();
    }

    private void deleteJmThatNeverStarted(FlinkService flinkService, FlinkDeployment deployment, Configuration deployConfig) {
        ((FlinkDeploymentStatus)deployment.getStatus()).getJobStatus().setState(JobStatus.FAILED);
        flinkService.deleteClusterDeployment(deployment.getMetadata(), (FlinkDeploymentStatus)deployment.getStatus(), deployConfig, false);
        LOG.info("Deleted application cluster that never started.");
    }

    @Override
    public void deploy(FlinkResourceContext<FlinkDeployment> ctx, FlinkDeploymentSpec spec, Configuration deployConfig, Optional<String> savepoint, boolean requireHaMetadata) throws Exception {
        FlinkDeployment relatedResource = ctx.getResource();
        FlinkDeploymentStatus status = (FlinkDeploymentStatus)relatedResource.getStatus();
        FlinkService flinkService = ctx.getFlinkService();
        ClusterHealthEvaluator.removeLastValidClusterHealthInfo(((FlinkDeploymentStatus)relatedResource.getStatus()).getClusterInfo());
        if (savepoint.isPresent()) {
            deployConfig.set(SavepointConfigOptions.SAVEPOINT_PATH, (Object)savepoint.get());
        } else if (requireHaMetadata && flinkService.atLeastOneCheckpoint(deployConfig)) {
            deployConfig.set(SavepointConfigOptions.SAVEPOINT_PATH, (Object)"KUBERNETES_OPERATOR_LAST_STATE");
            status.getJobStatus().setUpgradeSavepointPath("KUBERNETES_OPERATOR_LAST_STATE");
        } else {
            deployConfig.removeConfig(SavepointConfigOptions.SAVEPOINT_PATH);
        }
        this.setOwnerReference(relatedResource, deployConfig);
        ApplicationReconciler.setRandomJobResultStorePath(deployConfig);
        if (status.getJobManagerDeploymentStatus() != JobManagerDeploymentStatus.MISSING) {
            Preconditions.checkArgument((boolean)ReconciliationUtils.isJobInTerminalState(status));
            LOG.info("Deleting cluster with terminated application before new deployment");
            flinkService.deleteClusterDeployment(relatedResource.getMetadata(), status, deployConfig, !requireHaMetadata);
            this.statusRecorder.patchAndCacheStatus(relatedResource, ctx.getKubernetesClient());
        }
        this.setJobIdIfNecessary(relatedResource, deployConfig, ctx.getKubernetesClient(), requireHaMetadata);
        this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>)relatedResource, EventRecorder.Type.Normal, EventRecorder.Reason.Submit, EventRecorder.Component.JobManagerDeployment, "Starting deployment", ctx.getKubernetesClient());
        flinkService.submitApplicationCluster(spec.getJob(), deployConfig, requireHaMetadata);
        status.getJobStatus().setState(JobStatus.RECONCILING);
        status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
        IngressUtils.updateIngressRules(relatedResource.getMetadata(), spec, deployConfig, ctx.getKubernetesClient());
    }

    private void setJobIdIfNecessary(FlinkDeployment resource, Configuration deployConfig, KubernetesClient client, boolean lastStateDeploy) {
        String jobId;
        FlinkDeploymentStatus status = (FlinkDeploymentStatus)resource.getStatus();
        String userJobId = (String)deployConfig.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID);
        if (userJobId != null) {
            status.getJobStatus().setJobId(userJobId);
            this.statusRecorder.patchAndCacheStatus(resource, client);
            return;
        }
        if (status.getJobStatus().getJobId() == null || !lastStateDeploy) {
            jobId = JobID.generate().toHexString();
            status.getJobStatus().setJobId(jobId);
            LOG.info("Assigning JobId override to {}", (Object)jobId);
            this.statusRecorder.patchAndCacheStatus(resource, client);
        }
        jobId = status.getJobStatus().getJobId();
        LOG.debug("Setting {} to {}", (Object)PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, (Object)jobId);
        deployConfig.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, (Object)jobId);
    }

    @Override
    protected boolean cancelJob(FlinkResourceContext<FlinkDeployment> ctx, SuspendMode suspendMode) throws Exception {
        FlinkService.CancelResult result = ctx.getFlinkService().cancelJob(ctx.getResource(), suspendMode, ctx.getObserveConfig());
        result.getSavepointPath().ifPresent(location -> this.setUpgradeSavepointPath(ctx, (String)location));
        return result.isPending();
    }

    @Override
    protected void cleanupAfterFailedJob(FlinkResourceContext<FlinkDeployment> ctx) {
        FlinkService flinkService = ctx.getFlinkService();
        Configuration conf = ctx.getDeployConfig((AbstractFlinkSpec)ctx.getResource().getSpec());
        flinkService.deleteClusterDeployment(ctx.getResource().getMetadata(), (FlinkDeploymentStatus)ctx.getResource().getStatus(), conf, false);
    }

    private static void setRandomJobResultStorePath(Configuration effectiveConfig) {
        if (effectiveConfig.contains(HighAvailabilityOptions.HA_STORAGE_PATH)) {
            if (!effectiveConfig.contains(JobResultStoreOptions.DELETE_ON_COMMIT)) {
                effectiveConfig.set(JobResultStoreOptions.DELETE_ON_COMMIT, (Object)false);
            }
            effectiveConfig.set(JobResultStoreOptions.STORAGE_PATH, (Object)(effectiveConfig.getString(HighAvailabilityOptions.HA_STORAGE_PATH) + "/job-result-store/" + effectiveConfig.getString(KubernetesConfigOptions.CLUSTER_ID) + "/" + UUID.randomUUID()));
        }
    }

    @Override
    public boolean reconcileOtherChanges(FlinkResourceContext<FlinkDeployment> ctx) throws Exception {
        if (super.reconcileOtherChanges(ctx)) {
            return true;
        }
        FlinkDeployment deployment = ctx.getResource();
        Configuration observeConfig = ctx.getObserveConfig();
        boolean shouldRestartJobBecauseUnhealthy = this.shouldRestartJobBecauseUnhealthy(deployment, observeConfig);
        boolean shouldRecoverDeployment = this.shouldRecoverDeployment(observeConfig, deployment);
        if (shouldRestartJobBecauseUnhealthy || shouldRecoverDeployment) {
            if (shouldRecoverDeployment) {
                this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>)deployment, EventRecorder.Type.Warning, EventRecorder.Reason.RecoverDeployment, EventRecorder.Component.Job, MSG_RECOVERY, ctx.getKubernetesClient());
            }
            if (shouldRestartJobBecauseUnhealthy) {
                this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>)deployment, EventRecorder.Type.Warning, EventRecorder.Reason.RestartUnhealthyJob, EventRecorder.Component.Job, MSG_RESTART_UNHEALTHY, ctx.getKubernetesClient());
                this.cleanupAfterFailedJob(ctx);
            }
            this.resubmitJob(ctx, HighAvailabilityMode.isHighAvailabilityModeActivated((Configuration)ctx.getObserveConfig()));
            return true;
        }
        return this.cleanupTerminalJmAfterTtl(ctx.getFlinkService(), deployment, observeConfig);
    }

    private boolean shouldRestartJobBecauseUnhealthy(FlinkDeployment deployment, Configuration observeConfig) {
        boolean restartNeeded = false;
        if (observeConfig.getBoolean(KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED)) {
            Map clusterInfo = ((FlinkDeploymentStatus)deployment.getStatus()).getClusterInfo();
            ClusterHealthInfo clusterHealthInfo = ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo);
            if (clusterHealthInfo != null) {
                LOG.debug("Cluster info contains job health info");
                if (!clusterHealthInfo.isHealthy()) {
                    if (((FlinkDeploymentSpec)deployment.getSpec()).getJob().getUpgradeMode() == UpgradeMode.STATELESS) {
                        LOG.debug("Stateless job, recovering unhealthy jobmanager deployment");
                        restartNeeded = true;
                    } else if (HighAvailabilityMode.isHighAvailabilityModeActivated((Configuration)observeConfig)) {
                        LOG.debug("HA is enabled, recovering unhealthy jobmanager deployment");
                        restartNeeded = true;
                    } else {
                        LOG.warn("Could not recover unhealthy jobmanager deployment without HA enabled");
                    }
                    if (restartNeeded) {
                        ClusterHealthEvaluator.removeLastValidClusterHealthInfo(clusterInfo);
                    }
                }
            } else {
                LOG.debug("Cluster info not contains job health info, skipping health check");
            }
        }
        return restartNeeded;
    }

    private boolean cleanupTerminalJmAfterTtl(FlinkService flinkService, FlinkDeployment deployment, Configuration observeConfig) {
        boolean jmStillRunning;
        FlinkDeploymentStatus status = (FlinkDeploymentStatus)deployment.getStatus();
        boolean terminal = ReconciliationUtils.isJobInTerminalState(status);
        boolean bl = jmStillRunning = status.getJobManagerDeploymentStatus() != JobManagerDeploymentStatus.MISSING;
        if (terminal && jmStillRunning) {
            Duration ttl = (Duration)observeConfig.get(KubernetesOperatorConfigOptions.OPERATOR_JM_SHUTDOWN_TTL);
            boolean ttlPassed = this.clock.instant().isAfter(Instant.ofEpochMilli(Long.parseLong(status.getJobStatus().getUpdateTime())).plus(ttl));
            if (ttlPassed) {
                LOG.info("Removing JobManager deployment for terminal application.");
                flinkService.deleteClusterDeployment(deployment.getMetadata(), status, observeConfig, false);
                return true;
            }
        }
        return false;
    }

    @Override
    protected DeleteControl cleanupInternal(FlinkResourceContext<FlinkDeployment> ctx) {
        FlinkDeployment deployment = ctx.getResource();
        FlinkDeploymentStatus status = (FlinkDeploymentStatus)deployment.getStatus();
        Configuration conf = ctx.getDeployConfig((AbstractFlinkSpec)ctx.getResource().getSpec());
        if (status.getReconciliationStatus().isBeforeFirstDeployment() || ReconciliationUtils.isJobInTerminalState(status)) {
            ctx.getFlinkService().deleteClusterDeployment(deployment.getMetadata(), status, conf, true);
        } else {
            Configuration observeConfig = ctx.getObserveConfig();
            SuspendMode suspendMode = observeConfig.getBoolean(KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION) ? SuspendMode.SAVEPOINT : SuspendMode.STATELESS;
            this.cancelJob(ctx, suspendMode);
        }
        return DeleteControl.defaultDelete();
    }
}

