/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.observer.deployment;

import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
import io.fabric8.kubernetes.api.model.ContainerStatus;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentReconciliationStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.exception.MissingJobManagerException;
import org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver;
import org.apache.flink.kubernetes.operator.observer.deployment.FlinkDeploymentObserverContext;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractFlinkDeploymentObserver
extends AbstractFlinkResourceObserver<FlinkDeployment, FlinkDeploymentObserverContext> {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    protected final FlinkService flinkService;

    public AbstractFlinkDeploymentObserver(FlinkService flinkService, FlinkConfigManager configManager, EventRecorder eventRecorder) {
        super(configManager, eventRecorder);
        this.flinkService = flinkService;
    }

    @Override
    protected FlinkDeploymentObserverContext getObserverContext(FlinkDeployment resource, Context<?> context) {
        return new FlinkDeploymentObserverContext(resource, this.configManager);
    }

    @Override
    public void observeInternal(FlinkDeployment flinkDep, Context<?> context, FlinkDeploymentObserverContext observerContext) {
        if (!this.isJmDeploymentReady(flinkDep)) {
            this.observeJmDeployment(flinkDep, context, observerContext.getDeployedConfig());
        }
        if (this.isJmDeploymentReady(flinkDep)) {
            this.observeFlinkCluster(flinkDep, context, observerContext);
        }
        if (this.isJmDeploymentReady(flinkDep)) {
            this.observeClusterInfo(flinkDep, observerContext.getDeployedConfig());
        }
        this.clearErrorsIfDeploymentIsHealthy(flinkDep);
    }

    private void observeClusterInfo(FlinkDeployment flinkApp, Configuration configuration) {
        try {
            Map<String, String> clusterInfo = this.flinkService.getClusterInfo(configuration);
            ((FlinkDeploymentStatus)flinkApp.getStatus()).getClusterInfo().putAll(clusterInfo);
            this.logger.debug("ClusterInfo: {}", (Object)((FlinkDeploymentStatus)flinkApp.getStatus()).getClusterInfo());
        }
        catch (Exception e) {
            this.logger.error("Exception while fetching cluster info", (Throwable)e);
        }
    }

    protected void observeJmDeployment(FlinkDeployment flinkApp, Context<?> context, Configuration effectiveConfig) {
        FlinkDeploymentStatus deploymentStatus = (FlinkDeploymentStatus)flinkApp.getStatus();
        JobManagerDeploymentStatus previousJmStatus = deploymentStatus.getJobManagerDeploymentStatus();
        if (this.isSuspendedJob(flinkApp)) {
            this.logger.debug("Skipping observe step for suspended application deployments");
            return;
        }
        this.logger.info("Observing JobManager deployment. Previous status: {}", (Object)previousJmStatus.name());
        if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
            this.logger.info("JobManager deployment is ready");
            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
            return;
        }
        Optional deployment = context.getSecondaryResource(Deployment.class);
        if (deployment.isPresent()) {
            DeploymentStatus status = ((Deployment)deployment.get()).getStatus();
            DeploymentSpec spec = ((Deployment)deployment.get()).getSpec();
            if (status != null && status.getAvailableReplicas() != null && spec.getReplicas().intValue() == status.getReplicas().intValue() && spec.getReplicas().intValue() == status.getAvailableReplicas().intValue() && this.flinkService.isJobManagerPortReady(effectiveConfig)) {
                this.logger.info("JobManager deployment port is ready, waiting for the Flink REST API...");
                deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
                return;
            }
            try {
                this.checkFailedCreate(status);
                this.checkContainerBackoff(flinkApp, effectiveConfig);
            }
            catch (DeploymentFailedException dfe) {
                deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING.name());
                if (!JobManagerDeploymentStatus.ERROR.equals((Object)deploymentStatus.getJobManagerDeploymentStatus())) {
                    throw dfe;
                }
                return;
            }
            this.logger.info("JobManager is being deployed");
            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
            return;
        }
        deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
        deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING.name());
        if (previousJmStatus != JobManagerDeploymentStatus.MISSING && previousJmStatus != JobManagerDeploymentStatus.ERROR) {
            this.onMissingDeployment(flinkApp);
        }
    }

    private void checkFailedCreate(DeploymentStatus status) {
        List conditions = status.getConditions();
        for (DeploymentCondition dc : conditions) {
            if (!"FailedCreate".equals(dc.getReason()) || !"ReplicaFailure".equals(dc.getType())) continue;
            throw new DeploymentFailedException(dc);
        }
    }

    private void checkContainerBackoff(FlinkDeployment flinkApp, Configuration effectiveConfig) {
        PodList jmPods = this.flinkService.getJmPodList(flinkApp, effectiveConfig);
        for (Pod pod : jmPods.getItems()) {
            for (ContainerStatus cs : pod.getStatus().getContainerStatuses()) {
                ContainerStateWaiting csw = cs.getState().getWaiting();
                if (csw == null || !Set.of("CrashLoopBackOff", "ImagePullBackOff").contains(csw.getReason())) continue;
                throw new DeploymentFailedException(csw);
            }
        }
    }

    protected boolean isJmDeploymentReady(FlinkDeployment dep) {
        return ((FlinkDeploymentStatus)dep.getStatus()).getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.READY;
    }

    protected void clearErrorsIfDeploymentIsHealthy(FlinkDeployment dep) {
        FlinkDeploymentStatus status = (FlinkDeploymentStatus)dep.getStatus();
        FlinkDeploymentReconciliationStatus reconciliationStatus = status.getReconciliationStatus();
        if (status.getJobManagerDeploymentStatus() != JobManagerDeploymentStatus.ERROR && !JobStatus.FAILED.name().equals(((FlinkDeploymentStatus)dep.getStatus()).getJobStatus().getState()) && reconciliationStatus.isLastReconciledSpecStable()) {
            status.setError(null);
        }
    }

    protected boolean isSuspendedJob(FlinkDeployment deployment) {
        JobSpec jobSpec = ((FlinkDeploymentSpec)deployment.getSpec()).getJob();
        if (jobSpec == null) {
            return false;
        }
        FlinkDeploymentStatus deploymentStatus = (FlinkDeploymentStatus)deployment.getStatus();
        FlinkDeploymentSpec lastReconciledSpec = (FlinkDeploymentSpec)deploymentStatus.getReconciliationStatus().deserializeLastReconciledSpec();
        return deploymentStatus.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.MISSING && jobSpec.getState() == JobState.SUSPENDED && lastReconciledSpec != null && lastReconciledSpec.getJob().getState() == JobState.SUSPENDED;
    }

    private void onMissingDeployment(FlinkDeployment deployment) {
        String err = "Missing JobManager deployment";
        this.logger.error(err);
        ReconciliationUtils.updateForReconciliationError(deployment, new MissingJobManagerException(err), this.configManager.getOperatorConfiguration());
        this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>)deployment, EventRecorder.Type.Warning, EventRecorder.Reason.Missing, EventRecorder.Component.JobManagerDeployment, err);
    }

    @Override
    protected void updateStatusToDeployedIfAlreadyUpgraded(FlinkDeployment flinkDep, Context<?> context, FlinkDeploymentObserverContext observerContext) {
        FlinkDeploymentStatus status = (FlinkDeploymentStatus)flinkDep.getStatus();
        Optional depOpt = context.getSecondaryResource(Deployment.class);
        depOpt.ifPresent(deployment -> {
            Long upgradeTargetGeneration;
            Map annotations = deployment.getMetadata().getAnnotations();
            if (annotations == null) {
                return;
            }
            Long deployedGeneration = Optional.ofNullable((String)annotations.get("flinkdeployment.flink.apache.org/generation")).map(Long::valueOf).orElse(-1L);
            if (deployedGeneration.equals(upgradeTargetGeneration = ReconciliationUtils.getUpgradeTargetGeneration(flinkDep))) {
                this.logger.info("Pending upgrade is already deployed, updating status.");
                ReconciliationUtils.updateStatusForAlreadyUpgraded(flinkDep);
                if (((FlinkDeploymentSpec)flinkDep.getSpec()).getJob() != null) {
                    status.getJobStatus().setState(JobStatus.RECONCILING.name());
                }
                status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
            } else {
                this.logger.warn("Running deployment generation {} doesn't match upgrade target generation {}.", (Object)deployedGeneration, (Object)upgradeTargetGeneration);
            }
        });
    }

    protected abstract void observeFlinkCluster(FlinkDeployment var1, Context<?> var2, FlinkDeploymentObserverContext var3);
}

