/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.observer.deployment;

import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
import io.fabric8.kubernetes.api.model.ContainerStatus;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentReconciliationStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.exception.MissingJobManagerException;
import org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractFlinkDeploymentObserver
extends AbstractFlinkResourceObserver<FlinkDeployment> {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());

    public AbstractFlinkDeploymentObserver(EventRecorder eventRecorder) {
        super(eventRecorder);
    }

    @Override
    public void observeInternal(FlinkResourceContext<FlinkDeployment> ctx) {
        FlinkDeployment flinkDep = ctx.getResource();
        if (!this.isJmDeploymentReady(flinkDep)) {
            this.observeJmDeployment(ctx);
        }
        if (this.isJmDeploymentReady(flinkDep)) {
            this.observeFlinkCluster(ctx);
        }
        if (this.isJmDeploymentReady(flinkDep)) {
            this.observeClusterInfo(ctx);
        }
        this.clearErrorsIfDeploymentIsHealthy(flinkDep);
    }

    private void observeClusterInfo(FlinkResourceContext<FlinkDeployment> ctx) {
        FlinkDeployment flinkApp = ctx.getResource();
        try {
            Map<String, String> clusterInfo = ctx.getFlinkService().getClusterInfo(ctx.getObserveConfig());
            ((FlinkDeploymentStatus)flinkApp.getStatus()).getClusterInfo().putAll(clusterInfo);
            this.logger.debug("ClusterInfo: {}", (Object)((FlinkDeploymentStatus)flinkApp.getStatus()).getClusterInfo());
        }
        catch (Exception e) {
            this.logger.warn("Exception while fetching cluster info", (Throwable)e);
        }
    }

    protected void observeJmDeployment(FlinkResourceContext<FlinkDeployment> ctx) {
        FlinkDeployment flinkApp = ctx.getResource();
        FlinkDeploymentStatus deploymentStatus = (FlinkDeploymentStatus)flinkApp.getStatus();
        JobManagerDeploymentStatus previousJmStatus = deploymentStatus.getJobManagerDeploymentStatus();
        if (this.isSuspendedJob(flinkApp)) {
            this.logger.debug("Skipping observe step for suspended application deployments");
            return;
        }
        this.logger.info("Observing JobManager deployment. Previous status: {}", (Object)previousJmStatus.name());
        if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
            this.logger.info("JobManager deployment is ready");
            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
            return;
        }
        Optional deployment = ctx.getJosdkContext().getSecondaryResource(Deployment.class);
        if (deployment.isPresent()) {
            DeploymentStatus status = ((Deployment)deployment.get()).getStatus();
            DeploymentSpec spec = ((Deployment)deployment.get()).getSpec();
            if (status != null && status.getAvailableReplicas() != null && spec.getReplicas().intValue() == status.getReplicas().intValue() && spec.getReplicas().intValue() == status.getAvailableReplicas().intValue() && ctx.getFlinkService().isJobManagerPortReady(ctx.getObserveConfig())) {
                this.logger.info("JobManager deployment port is ready, waiting for the Flink REST API...");
                deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
                return;
            }
            try {
                this.checkFailedCreate(status);
                this.checkContainerBackoff(ctx);
            }
            catch (DeploymentFailedException dfe) {
                deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING.name());
                if (!JobManagerDeploymentStatus.ERROR.equals((Object)deploymentStatus.getJobManagerDeploymentStatus())) {
                    throw dfe;
                }
                return;
            }
            this.logger.info("JobManager is being deployed");
            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
            return;
        }
        deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
        deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING.name());
        if (previousJmStatus != JobManagerDeploymentStatus.MISSING && previousJmStatus != JobManagerDeploymentStatus.ERROR) {
            this.onMissingDeployment(ctx);
        }
    }

    private void checkFailedCreate(DeploymentStatus status) {
        List conditions = status.getConditions();
        for (DeploymentCondition dc : conditions) {
            if (!"FailedCreate".equals(dc.getReason()) || !"ReplicaFailure".equals(dc.getType())) continue;
            throw new DeploymentFailedException(dc);
        }
    }

    private void checkContainerBackoff(FlinkResourceContext<FlinkDeployment> ctx) {
        PodList jmPods = ctx.getFlinkService().getJmPodList(ctx.getResource(), ctx.getObserveConfig());
        for (Pod pod : jmPods.getItems()) {
            for (ContainerStatus cs : pod.getStatus().getContainerStatuses()) {
                ContainerStateWaiting csw = cs.getState().getWaiting();
                if (csw == null || !Set.of("CrashLoopBackOff", "ImagePullBackOff", "ErrImagePull").contains(csw.getReason())) continue;
                throw new DeploymentFailedException(csw);
            }
        }
    }

    protected boolean isJmDeploymentReady(FlinkDeployment dep) {
        return ((FlinkDeploymentStatus)dep.getStatus()).getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.READY;
    }

    protected void clearErrorsIfDeploymentIsHealthy(FlinkDeployment dep) {
        FlinkDeploymentStatus status = (FlinkDeploymentStatus)dep.getStatus();
        FlinkDeploymentReconciliationStatus reconciliationStatus = status.getReconciliationStatus();
        if (status.getJobManagerDeploymentStatus() != JobManagerDeploymentStatus.ERROR && !JobStatus.FAILED.name().equals(((FlinkDeploymentStatus)dep.getStatus()).getJobStatus().getState()) && reconciliationStatus.isLastReconciledSpecStable()) {
            status.setError(null);
        }
    }

    protected boolean isSuspendedJob(FlinkDeployment deployment) {
        JobSpec jobSpec = ((FlinkDeploymentSpec)deployment.getSpec()).getJob();
        if (jobSpec == null) {
            return false;
        }
        FlinkDeploymentStatus deploymentStatus = (FlinkDeploymentStatus)deployment.getStatus();
        FlinkDeploymentSpec lastReconciledSpec = (FlinkDeploymentSpec)deploymentStatus.getReconciliationStatus().deserializeLastReconciledSpec();
        return deploymentStatus.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.MISSING && jobSpec.getState() == JobState.SUSPENDED && lastReconciledSpec != null && lastReconciledSpec.getJob().getState() == JobState.SUSPENDED;
    }

    private void onMissingDeployment(FlinkResourceContext<FlinkDeployment> ctx) {
        String err = "Missing JobManager deployment";
        this.logger.error(err);
        ReconciliationUtils.updateForReconciliationError(ctx, new MissingJobManagerException(err));
        this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>)ctx.getResource(), EventRecorder.Type.Warning, EventRecorder.Reason.Missing, EventRecorder.Component.JobManagerDeployment, err, ctx.getKubernetesClient());
    }

    @Override
    protected boolean checkIfAlreadyUpgraded(FlinkResourceContext<FlinkDeployment> ctx) {
        Long upgradeTargetGeneration;
        FlinkDeployment flinkDep = ctx.getResource();
        FlinkDeploymentStatus status = (FlinkDeploymentStatus)flinkDep.getStatus();
        if (status.getJobManagerDeploymentStatus() != JobManagerDeploymentStatus.MISSING) {
            return false;
        }
        Optional depOpt = ctx.getJosdkContext().getSecondaryResource(Deployment.class);
        if (!depOpt.isPresent()) {
            return false;
        }
        Deployment deployment = (Deployment)depOpt.get();
        if (deployment.isMarkedForDeletion()) {
            this.logger.debug("Deployment already marked for deletion, ignoring...");
            return false;
        }
        Map annotations = deployment.getMetadata().getAnnotations();
        if (annotations == null) {
            this.logger.warn("Running deployment doesn't have any annotations. This could indicate a deployment error.");
            return false;
        }
        Long deployedGeneration = Optional.ofNullable((String)annotations.get("flinkdeployment.flink.apache.org/generation")).map(Long::valueOf).orElse(-1L);
        if (deployedGeneration.equals(upgradeTargetGeneration = ReconciliationUtils.getUpgradeTargetGeneration(flinkDep))) {
            this.logger.info("Pending upgrade is already deployed, updating status.");
            status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
            return true;
        }
        this.logger.warn("Running deployment generation {} doesn't match upgrade target generation {}.", (Object)deployedGeneration, (Object)upgradeTargetGeneration);
        return false;
    }

    protected abstract void observeFlinkCluster(FlinkResourceContext<FlinkDeployment> var1);
}

