/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.observer;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkResourceExceptionUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class JobStatusObserver<R extends AbstractFlinkResource<?, ?>> {
    private static final Logger LOG = LoggerFactory.getLogger(JobStatusObserver.class);
    public static final String MISSING_SESSION_JOB_ERR = "Missing Session Job";
    protected final EventRecorder eventRecorder;

    public JobStatusObserver(EventRecorder eventRecorder) {
        this.eventRecorder = eventRecorder;
    }

    public boolean observe(FlinkResourceContext<R> ctx) {
        ArrayList<JobStatusMessage> clusterJobStatuses;
        R resource = ctx.getResource();
        org.apache.flink.kubernetes.operator.api.status.JobStatus jobStatus = ((CommonStatus)resource.getStatus()).getJobStatus();
        LOG.debug("Observing job status");
        String previousJobStatus = jobStatus.getState();
        try {
            clusterJobStatuses = new ArrayList<JobStatusMessage>(ctx.getFlinkService().listJobs(ctx.getObserveConfig()));
        }
        catch (Exception e) {
            LOG.warn("Exception while listing jobs", (Throwable)e);
            this.ifRunningMoveToReconciling(jobStatus, previousJobStatus);
            if (e instanceof TimeoutException) {
                this.onTimeout(ctx);
            }
            return false;
        }
        if (!clusterJobStatuses.isEmpty()) {
            Optional<JobStatusMessage> targetJobStatusMessage = this.filterTargetJob(jobStatus, clusterJobStatuses);
            if (targetJobStatusMessage.isEmpty()) {
                LOG.warn("No matching jobs found on the cluster");
                this.ifRunningMoveToReconciling(jobStatus, previousJobStatus);
                this.onTargetJobNotFound(ctx);
                return false;
            }
            this.updateJobStatus(ctx, targetJobStatusMessage.get());
            ReconciliationUtils.checkAndUpdateStableSpec((CommonStatus)resource.getStatus());
            return true;
        }
        LOG.debug("No jobs found on the cluster");
        this.ifRunningMoveToReconciling(jobStatus, previousJobStatus);
        this.onNoJobsFound(ctx);
        return false;
    }

    protected abstract void onTargetJobNotFound(FlinkResourceContext<R> var1);

    protected void onNoJobsFound(FlinkResourceContext<R> ctx) {
    }

    private void ifRunningMoveToReconciling(org.apache.flink.kubernetes.operator.api.status.JobStatus jobStatus, String previousJobStatus) {
        if (JobStatus.RUNNING.name().equals(previousJobStatus)) {
            jobStatus.setState(JobStatus.RECONCILING.name());
        }
    }

    protected abstract void onTimeout(FlinkResourceContext<R> var1);

    protected abstract Optional<JobStatusMessage> filterTargetJob(org.apache.flink.kubernetes.operator.api.status.JobStatus var1, List<JobStatusMessage> var2);

    private void updateJobStatus(FlinkResourceContext<R> ctx, JobStatusMessage clusterJobStatus) {
        R resource = ctx.getResource();
        org.apache.flink.kubernetes.operator.api.status.JobStatus jobStatus = ((CommonStatus)resource.getStatus()).getJobStatus();
        String previousJobId = jobStatus.getJobId();
        String previousJobStatus = jobStatus.getState();
        jobStatus.setState(clusterJobStatus.getJobState().name());
        jobStatus.setJobName(clusterJobStatus.getJobName());
        jobStatus.setJobId(clusterJobStatus.getJobId().toHexString());
        jobStatus.setStartTime(String.valueOf(clusterJobStatus.getStartTime()));
        if (jobStatus.getJobId().equals(previousJobId) && jobStatus.getState().equals(previousJobStatus)) {
            LOG.debug("Job status ({}) unchanged", (Object)previousJobStatus);
        } else {
            jobStatus.setUpdateTime(String.valueOf(System.currentTimeMillis()));
            String message = previousJobStatus == null ? String.format("Job status changed to %s", jobStatus.getState()) : String.format("Job status changed from %s to %s", previousJobStatus, jobStatus.getState());
            this.setErrorIfPresent(ctx, clusterJobStatus);
            this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>)resource, EventRecorder.Type.Normal, EventRecorder.Reason.JobStatusChanged, EventRecorder.Component.Job, message, ctx.getKubernetesClient());
        }
    }

    private void setErrorIfPresent(FlinkResourceContext<R> ctx, JobStatusMessage clusterJobStatus) {
        if (clusterJobStatus.getJobState() == JobStatus.FAILED) {
            try {
                JobResult result = ctx.getFlinkService().requestJobResult(ctx.getObserveConfig(), clusterJobStatus.getJobId());
                result.getSerializedThrowable().ifPresent(t -> {
                    FlinkResourceExceptionUtils.updateFlinkResourceException((Throwable)t, ctx.getResource(), ctx.getOperatorConfig());
                    LOG.error("Job {} failed with error: {}", (Object)clusterJobStatus.getJobId(), (Object)t.getFullStringifiedStackTrace());
                });
            }
            catch (Exception e) {
                LOG.warn("Failed to request the job result", (Throwable)e);
            }
        }
    }
}

