/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.observer;

import io.javaoperatorsdk.operator.api.reconciler.Context;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.observer.ObserverContext;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkResourceExceptionUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class JobStatusObserver<R extends AbstractFlinkResource<?, ?>, CTX extends ObserverContext> {
    private static final Logger LOG = LoggerFactory.getLogger(JobStatusObserver.class);
    private static final int MAX_ERROR_STRING_LENGTH = 512;
    public static final String MISSING_SESSION_JOB_ERR = "Missing Session Job";
    protected final FlinkService flinkService;
    protected final EventRecorder eventRecorder;
    protected final FlinkConfigManager configManager;

    public JobStatusObserver(FlinkService flinkService, FlinkConfigManager configManager, EventRecorder eventRecorder) {
        this.flinkService = flinkService;
        this.eventRecorder = eventRecorder;
        this.configManager = configManager;
    }

    public boolean observe(R resource, Context resourceContext, CTX ctx) {
        ArrayList<JobStatusMessage> clusterJobStatuses;
        org.apache.flink.kubernetes.operator.api.status.JobStatus jobStatus = ((CommonStatus)resource.getStatus()).getJobStatus();
        LOG.info("Observing job status");
        String previousJobStatus = jobStatus.getState();
        try {
            clusterJobStatuses = new ArrayList<JobStatusMessage>(this.flinkService.listJobs(ctx.getDeployedConfig()));
        }
        catch (Exception e) {
            LOG.error("Exception while listing jobs", (Throwable)e);
            this.ifRunningMoveToReconciling(jobStatus, previousJobStatus);
            if (e instanceof TimeoutException) {
                this.onTimeout(resource, resourceContext, ctx);
            }
            return false;
        }
        if (!clusterJobStatuses.isEmpty()) {
            Optional<JobStatusMessage> targetJobStatusMessage = this.filterTargetJob(jobStatus, clusterJobStatuses);
            if (targetJobStatusMessage.isEmpty()) {
                LOG.warn("No matching jobs found on the cluster");
                this.ifRunningMoveToReconciling(jobStatus, previousJobStatus);
                this.onTargetJobNotFound(resource, ctx.getDeployedConfig());
                return false;
            }
            this.updateJobStatus(resource, targetJobStatusMessage.get(), ctx.getDeployedConfig());
            ReconciliationUtils.checkAndUpdateStableSpec((CommonStatus)resource.getStatus());
            return true;
        }
        LOG.debug("No jobs found on the cluster");
        this.ifRunningMoveToReconciling(jobStatus, previousJobStatus);
        this.onNoJobsFound(resource, ctx.getDeployedConfig());
        return false;
    }

    protected abstract void onTargetJobNotFound(R var1, Configuration var2);

    protected void onNoJobsFound(R resource, Configuration config) {
    }

    private void ifRunningMoveToReconciling(org.apache.flink.kubernetes.operator.api.status.JobStatus jobStatus, String previousJobStatus) {
        if (JobStatus.RUNNING.name().equals(previousJobStatus)) {
            jobStatus.setState(JobStatus.RECONCILING.name());
        }
    }

    protected abstract void onTimeout(R var1, Context<?> var2, CTX var3);

    protected abstract Optional<JobStatusMessage> filterTargetJob(org.apache.flink.kubernetes.operator.api.status.JobStatus var1, List<JobStatusMessage> var2);

    private void updateJobStatus(R resource, JobStatusMessage clusterJobStatus, Configuration deployedConfig) {
        org.apache.flink.kubernetes.operator.api.status.JobStatus jobStatus = ((CommonStatus)resource.getStatus()).getJobStatus();
        String previousJobStatus = jobStatus.getState();
        jobStatus.setState(clusterJobStatus.getJobState().name());
        jobStatus.setJobName(clusterJobStatus.getJobName());
        jobStatus.setJobId(clusterJobStatus.getJobId().toHexString());
        jobStatus.setStartTime(String.valueOf(clusterJobStatus.getStartTime()));
        if (jobStatus.getState().equals(previousJobStatus)) {
            LOG.info("Job status ({}) unchanged", (Object)previousJobStatus);
        } else {
            jobStatus.setUpdateTime(String.valueOf(System.currentTimeMillis()));
            String message = previousJobStatus == null ? String.format("Job status changed to %s", jobStatus.getState()) : String.format("Job status changed from %s to %s", previousJobStatus, jobStatus.getState());
            LOG.info(message);
            this.setErrorIfPresent(resource, clusterJobStatus, deployedConfig);
            this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>)resource, EventRecorder.Type.Normal, EventRecorder.Reason.JobStatusChanged, EventRecorder.Component.Job, message);
        }
    }

    private void setErrorIfPresent(R resource, JobStatusMessage clusterJobStatus, Configuration deployedConfig) {
        if (clusterJobStatus.getJobState() == JobStatus.FAILED) {
            try {
                JobResult result = this.flinkService.requestJobResult(deployedConfig, clusterJobStatus.getJobId());
                result.getSerializedThrowable().ifPresent(t -> {
                    FlinkResourceExceptionUtils.updateFlinkResourceException((Throwable)t, resource, this.configManager.getOperatorConfiguration());
                    LOG.error("Job {} failed with error: {}", (Object)clusterJobStatus.getJobId(), (Object)t.getFullStringifiedStackTrace());
                });
            }
            catch (Exception e) {
                LOG.warn("Failed to request the job result", (Throwable)e);
            }
        }
    }
}

