/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.observer;

import java.util.Optional;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkResourceExceptionUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobStatusObserver<R extends AbstractFlinkResource<?, ?>> {
    private static final Logger LOG = LoggerFactory.getLogger(JobStatusObserver.class);
    public static final String JOB_NOT_FOUND_ERR = "Job Not Found";
    protected final EventRecorder eventRecorder;

    public JobStatusObserver(EventRecorder eventRecorder) {
        this.eventRecorder = eventRecorder;
    }

    public boolean observe(FlinkResourceContext<R> ctx) {
        block4: {
            R resource = ctx.getResource();
            if (((CommonStatus)resource.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState() == JobState.SUSPENDED) {
                return false;
            }
            org.apache.flink.kubernetes.operator.api.status.JobStatus jobStatus = ((CommonStatus)resource.getStatus()).getJobStatus();
            LOG.debug("Observing job status");
            JobStatus previousJobStatus = jobStatus.getState();
            try {
                Optional<JobStatusMessage> newJobStatusOpt = ctx.getFlinkService().getJobStatus(ctx.getObserveConfig(), JobID.fromHexString((String)jobStatus.getJobId()));
                if (newJobStatusOpt.isPresent()) {
                    this.updateJobStatus(ctx, newJobStatusOpt.get());
                    ReconciliationUtils.checkAndUpdateStableSpec((CommonStatus)resource.getStatus());
                    return true;
                }
                this.onTargetJobNotFound(ctx);
            }
            catch (Exception e) {
                LOG.warn("Exception while getting job status", (Throwable)e);
                this.ifRunningMoveToReconciling(jobStatus, previousJobStatus);
                if (!(e instanceof TimeoutException)) break block4;
                this.onTimeout(ctx);
            }
        }
        return false;
    }

    protected void onTargetJobNotFound(FlinkResourceContext<R> ctx) {
        R resource = ctx.getResource();
        org.apache.flink.kubernetes.operator.api.status.JobStatus jobStatus = ((CommonStatus)resource.getStatus()).getJobStatus();
        this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>)resource, EventRecorder.Type.Warning, EventRecorder.Reason.Missing, EventRecorder.Component.Job, JOB_NOT_FOUND_ERR, ctx.getKubernetesClient());
        if (resource instanceof FlinkSessionJob && !ReconciliationUtils.isJobInTerminalState((CommonStatus)resource.getStatus()) && ((AbstractFlinkSpec)resource.getSpec()).getJob().getUpgradeMode() == UpgradeMode.STATELESS) {
            JobStatusObserver.markSuspended(resource);
        } else {
            ((CommonStatus)resource.getStatus()).getReconciliationStatus().setState(ReconciliationState.DEPLOYED);
        }
        jobStatus.setState(JobStatus.RECONCILING);
        ((CommonStatus)resource.getStatus()).setError(JOB_NOT_FOUND_ERR);
    }

    private void ifRunningMoveToReconciling(org.apache.flink.kubernetes.operator.api.status.JobStatus jobStatus, JobStatus previousJobStatus) {
        if (JobStatus.RUNNING == previousJobStatus) {
            jobStatus.setState(JobStatus.RECONCILING);
        }
    }

    protected void onTimeout(FlinkResourceContext<R> ctx) {
    }

    private void updateJobStatus(FlinkResourceContext<R> ctx, JobStatusMessage clusterJobStatus) {
        R resource = ctx.getResource();
        org.apache.flink.kubernetes.operator.api.status.JobStatus jobStatus = ((CommonStatus)resource.getStatus()).getJobStatus();
        JobStatus previousJobStatus = jobStatus.getState();
        JobStatus currentJobStatus = clusterJobStatus.getJobState();
        jobStatus.setState(currentJobStatus);
        jobStatus.setJobName(clusterJobStatus.getJobName());
        jobStatus.setStartTime(String.valueOf(clusterJobStatus.getStartTime()));
        if (jobStatus.getState().equals((Object)previousJobStatus)) {
            LOG.debug("Job status ({}) unchanged", (Object)previousJobStatus);
        } else {
            String message;
            jobStatus.setUpdateTime(String.valueOf(System.currentTimeMillis()));
            String string = message = previousJobStatus == null ? String.format("Job status changed to %s", jobStatus.getState()) : String.format("Job status changed from %s to %s", previousJobStatus, jobStatus.getState());
            if (JobStatus.CANCELED == currentJobStatus || currentJobStatus.isGloballyTerminalState() && JobStatus.CANCELLING.equals((Object)previousJobStatus)) {
                JobStatusObserver.markSuspended(resource);
            }
            this.setErrorIfPresent(ctx, clusterJobStatus);
            this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>)resource, EventRecorder.Type.Normal, EventRecorder.Reason.JobStatusChanged, EventRecorder.Component.Job, message, ctx.getKubernetesClient());
        }
    }

    private static void markSuspended(AbstractFlinkResource<?, ?> resource) {
        LOG.info("Marking suspended");
        ReconciliationUtils.updateLastReconciledSpec(resource, (s, m) -> {
            s.getJob().setState(JobState.SUSPENDED);
            m.setFirstDeployment(false);
        });
    }

    private void setErrorIfPresent(FlinkResourceContext<R> ctx, JobStatusMessage clusterJobStatus) {
        if (clusterJobStatus.getJobState() == JobStatus.FAILED) {
            try {
                JobResult result = ctx.getFlinkService().requestJobResult(ctx.getObserveConfig(), clusterJobStatus.getJobId());
                result.getSerializedThrowable().ifPresent(t -> {
                    FlinkResourceExceptionUtils.updateFlinkResourceException((Throwable)t, ctx.getResource(), ctx.getOperatorConfig());
                    LOG.error("Job {} failed with error: {}", (Object)clusterJobStatus.getJobId(), (Object)t.getFullStringifiedStackTrace());
                });
            }
            catch (Exception e) {
                LOG.warn("Failed to request the job result", (Throwable)e);
            }
        }
    }
}

