/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.reconciler.deployment;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.exception.RecoveryFailureException;
import org.apache.flink.kubernetes.operator.health.ClusterHealthInfo;
import org.apache.flink.kubernetes.operator.observer.ClusterHealthEvaluator;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ApplicationReconciler
extends AbstractJobReconciler<FlinkDeployment, FlinkDeploymentSpec, FlinkDeploymentStatus> {
    private static final Logger LOG = LoggerFactory.getLogger(ApplicationReconciler.class);
    static final String MSG_RECOVERY = "Recovering lost deployment";
    static final String MSG_RESTART_UNHEALTHY = "Restarting unhealthy job";
    protected final FlinkService flinkService;

    public ApplicationReconciler(KubernetesClient kubernetesClient, FlinkService flinkService, FlinkConfigManager configManager, EventRecorder eventRecorder, StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder) {
        super(kubernetesClient, configManager, eventRecorder, statusRecorder);
        this.flinkService = flinkService;
    }

    @Override
    protected FlinkService getFlinkService(FlinkDeployment resource, Context<?> context) {
        return this.flinkService;
    }

    @Override
    protected Configuration getObserveConfig(FlinkDeployment deployment, Context<?> context) {
        return this.configManager.getObserveConfig(deployment);
    }

    @Override
    protected Configuration getDeployConfig(ObjectMeta deployMeta, FlinkDeploymentSpec currentDeploySpec, Context<?> context) {
        return this.configManager.getDeployConfig(deployMeta, currentDeploySpec);
    }

    @Override
    protected Optional<UpgradeMode> getAvailableUpgradeMode(FlinkDeployment deployment, Configuration deployConfig, Configuration observeConfig) {
        FlinkDeploymentStatus status = (FlinkDeploymentStatus)deployment.getStatus();
        Optional<UpgradeMode> availableUpgradeMode = super.getAvailableUpgradeMode(deployment, deployConfig, observeConfig);
        if (availableUpgradeMode.isPresent()) {
            return availableUpgradeMode;
        }
        if (deployConfig.getBoolean(KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED) && FlinkUtils.isKubernetesHAActivated(deployConfig) && FlinkUtils.isKubernetesHAActivated(observeConfig) && !this.flinkVersionChanged((FlinkDeploymentSpec)ReconciliationUtils.getDeployedSpec(deployment), (FlinkDeploymentSpec)deployment.getSpec())) {
            if (!this.flinkService.isHaMetadataAvailable(deployConfig)) {
                if (((FlinkDeploymentStatus)deployment.getStatus()).getReconciliationStatus().getLastStableSpec() == null) {
                    return this.resetOnMissingStableSpec(deployment, deployConfig);
                }
            } else {
                LOG.info("Job is not running but HA metadata is available for last state restore, ready for upgrade");
                return Optional.of(UpgradeMode.LAST_STATE);
            }
        }
        if (status.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.MISSING || status.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.ERROR) {
            throw new RecoveryFailureException("JobManager deployment is missing and HA data is not available to make stateful upgrades. It is possible that the job has finished or terminally failed, or the configmaps have been deleted. Manual restore required.", "UpgradeFailed");
        }
        LOG.info("Job is not running yet and HA metadata is not available, waiting for upgradeable state");
        return Optional.empty();
    }

    private Optional<UpgradeMode> resetOnMissingStableSpec(FlinkDeployment deployment, Configuration deployConfig) {
        this.flinkService.deleteClusterDeployment(deployment.getMetadata(), (FlinkDeploymentStatus)deployment.getStatus(), false);
        this.flinkService.waitForClusterShutdown(deployConfig);
        if (!this.flinkService.isHaMetadataAvailable(deployConfig)) {
            LOG.info("Job never entered stable state. Resetting status for initial deploy");
            ReconciliationUtils.clearLastReconciledSpecIfFirstDeploy(deployment);
            return Optional.empty();
        }
        LOG.info("Found HA state after deployment deletion, falling back to stateful upgrade");
        return Optional.of(UpgradeMode.LAST_STATE);
    }

    @Override
    protected void deploy(FlinkDeployment relatedResource, FlinkDeploymentSpec spec, FlinkDeploymentStatus status, Context<?> ctx, Configuration deployConfig, Optional<String> savepoint, boolean requireHaMetadata) throws Exception {
        if (savepoint.isPresent()) {
            deployConfig.set(SavepointConfigOptions.SAVEPOINT_PATH, (Object)savepoint.get());
        } else {
            deployConfig.removeConfig(SavepointConfigOptions.SAVEPOINT_PATH);
        }
        this.setOwnerReference(relatedResource, deployConfig);
        ApplicationReconciler.setRandomJobResultStorePath(deployConfig);
        if (status.getJobManagerDeploymentStatus() != JobManagerDeploymentStatus.MISSING) {
            if (!ReconciliationUtils.isJobInTerminalState(status)) {
                LOG.error("Invalid status for deployment: {}", (Object)status);
                throw new RuntimeException("This indicates a bug...");
            }
            LOG.info("Deleting deployment with terminated application before new deployment");
            this.flinkService.deleteClusterDeployment(relatedResource.getMetadata(), status, true);
            this.flinkService.waitForClusterShutdown(deployConfig);
        }
        this.setJobIdIfNecessary(spec, relatedResource, deployConfig);
        this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>)relatedResource, EventRecorder.Type.Normal, EventRecorder.Reason.Submit, EventRecorder.Component.JobManagerDeployment, "Starting deployment");
        this.flinkService.submitApplicationCluster(spec.getJob(), deployConfig, requireHaMetadata);
        status.getJobStatus().setState(JobStatus.RECONCILING.name());
        status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
        IngressUtils.updateIngressRules(relatedResource.getMetadata(), spec, deployConfig, this.kubernetesClient);
    }

    private void setJobIdIfNecessary(FlinkDeploymentSpec spec, FlinkDeployment resource, Configuration deployConfig) {
        String jobId;
        if (deployConfig.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID) != null) {
            return;
        }
        FlinkDeploymentStatus status = (FlinkDeploymentStatus)resource.getStatus();
        if (status.getJobStatus().getJobId() == null || spec.getJob().getUpgradeMode() == UpgradeMode.STATELESS) {
            jobId = JobID.generate().toHexString();
            status.getJobStatus().setJobId(jobId);
            LOG.info("Assigning JobId override to {}", (Object)jobId);
            this.statusRecorder.patchAndCacheStatus(resource);
        }
        jobId = status.getJobStatus().getJobId();
        LOG.debug("Setting {} to {}", (Object)PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, (Object)jobId);
        deployConfig.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, (Object)jobId);
    }

    @Override
    protected void cancelJob(FlinkDeployment deployment, Context<?> ctx, UpgradeMode upgradeMode, Configuration observeConfig) throws Exception {
        this.flinkService.cancelJob(deployment, upgradeMode, observeConfig);
    }

    @Override
    protected void cleanupAfterFailedJob(FlinkDeployment deployment, Context<?> ctx, Configuration observeConfig) {
        this.flinkService.deleteClusterDeployment(deployment.getMetadata(), (FlinkDeploymentStatus)deployment.getStatus(), false);
    }

    private static void setRandomJobResultStorePath(Configuration effectiveConfig) {
        if (effectiveConfig.contains(HighAvailabilityOptions.HA_STORAGE_PATH)) {
            if (!effectiveConfig.contains(JobResultStoreOptions.DELETE_ON_COMMIT)) {
                effectiveConfig.set(JobResultStoreOptions.DELETE_ON_COMMIT, (Object)false);
            }
            effectiveConfig.set(JobResultStoreOptions.STORAGE_PATH, (Object)(effectiveConfig.getString(HighAvailabilityOptions.HA_STORAGE_PATH) + "/job-result-store/" + effectiveConfig.getString(KubernetesConfigOptions.CLUSTER_ID) + "/" + UUID.randomUUID()));
        }
    }

    @Override
    public boolean reconcileOtherChanges(FlinkDeployment deployment, Context<?> ctx, Configuration observeConfig) throws Exception {
        if (super.reconcileOtherChanges(deployment, ctx, observeConfig)) {
            return true;
        }
        boolean shouldRestartJobBecauseUnhealthy = this.shouldRestartJobBecauseUnhealthy(deployment, observeConfig);
        boolean shouldRecoverDeployment = this.shouldRecoverDeployment(observeConfig, deployment);
        if (shouldRestartJobBecauseUnhealthy || shouldRecoverDeployment) {
            if (shouldRecoverDeployment) {
                this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>)deployment, EventRecorder.Type.Warning, EventRecorder.Reason.RecoverDeployment, EventRecorder.Component.Job, MSG_RECOVERY);
            }
            if (shouldRestartJobBecauseUnhealthy) {
                this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>)deployment, EventRecorder.Type.Warning, EventRecorder.Reason.RestartUnhealthyJob, EventRecorder.Component.Job, MSG_RESTART_UNHEALTHY);
                this.cleanupAfterFailedJob(deployment, ctx, observeConfig);
            }
            this.resubmitJob(deployment, ctx, observeConfig, true);
            return true;
        }
        return this.cleanupTerminalJmAfterTtl(deployment, observeConfig);
    }

    private boolean shouldRestartJobBecauseUnhealthy(FlinkDeployment deployment, Configuration observeConfig) {
        boolean restartNeeded = false;
        if (observeConfig.getBoolean(KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED)) {
            Map clusterInfo = ((FlinkDeploymentStatus)deployment.getStatus()).getClusterInfo();
            ClusterHealthInfo clusterHealthInfo = ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo);
            if (clusterHealthInfo != null) {
                LOG.debug("Cluster info contains job health info");
                if (!clusterHealthInfo.isHealthy()) {
                    if (((FlinkDeploymentSpec)deployment.getSpec()).getJob().getUpgradeMode() == UpgradeMode.STATELESS) {
                        LOG.debug("Stateless job, recovering unhealthy jobmanager deployment");
                        restartNeeded = true;
                    } else if (FlinkUtils.isKubernetesHAActivated(observeConfig)) {
                        LOG.debug("HA is enabled, recovering unhealthy jobmanager deployment");
                        restartNeeded = true;
                    } else {
                        LOG.warn("Could not recover unhealthy jobmanager deployment without HA enabled");
                    }
                    if (restartNeeded) {
                        ClusterHealthEvaluator.removeLastValidClusterHealthInfo(clusterInfo);
                    }
                }
            } else {
                LOG.debug("Cluster info not contains job health info, skipping health check");
            }
        }
        return restartNeeded;
    }

    private boolean cleanupTerminalJmAfterTtl(FlinkDeployment deployment, Configuration observeConfig) {
        boolean jmStillRunning;
        FlinkDeploymentStatus status = (FlinkDeploymentStatus)deployment.getStatus();
        boolean terminal = ReconciliationUtils.isJobInTerminalState(status);
        boolean bl = jmStillRunning = status.getJobManagerDeploymentStatus() != JobManagerDeploymentStatus.MISSING;
        if (terminal && jmStillRunning) {
            Duration ttl = (Duration)observeConfig.get(KubernetesOperatorConfigOptions.OPERATOR_JM_SHUTDOWN_TTL);
            boolean ttlPassed = this.clock.instant().isAfter(Instant.ofEpochMilli(Long.parseLong(status.getJobStatus().getUpdateTime())).plus(ttl));
            if (ttlPassed) {
                LOG.info("Removing JobManager deployment for terminal application.");
                this.flinkService.deleteClusterDeployment(deployment.getMetadata(), status, false);
                return true;
            }
        }
        return false;
    }

    @Override
    protected DeleteControl cleanupInternal(FlinkDeployment deployment, Context<?> context) {
        FlinkDeploymentStatus status = (FlinkDeploymentStatus)deployment.getStatus();
        if (status.getReconciliationStatus().isBeforeFirstDeployment()) {
            this.flinkService.deleteClusterDeployment(deployment.getMetadata(), status, true);
        } else {
            this.flinkService.cancelJob(deployment, UpgradeMode.STATELESS, this.configManager.getObserveConfig(deployment));
        }
        return DeleteControl.defaultDelete();
    }
}

