/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.observer.sessionjob;

import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.exception.MissingSessionJobException;
import org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver;
import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
import org.apache.flink.kubernetes.operator.observer.SnapshotObserver;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkSessionJobObserver
extends AbstractFlinkResourceObserver<FlinkSessionJob> {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkSessionJobObserver.class);
    private final SessionJobStatusObserver jobStatusObserver;
    private final SnapshotObserver<FlinkSessionJob, FlinkSessionJobStatus> savepointObserver;

    public FlinkSessionJobObserver(EventRecorder eventRecorder) {
        super(eventRecorder);
        this.jobStatusObserver = new SessionJobStatusObserver(eventRecorder);
        this.savepointObserver = new SnapshotObserver(eventRecorder);
    }

    @Override
    protected boolean isResourceReadyToBeObserved(FlinkResourceContext<FlinkSessionJob> ctx) {
        return super.isResourceReadyToBeObserved(ctx) && ctx.getFlinkService() != null;
    }

    @Override
    protected void observeInternal(FlinkResourceContext<FlinkSessionJob> ctx) {
        boolean jobFound = this.jobStatusObserver.observe(ctx);
        if (jobFound) {
            this.savepointObserver.observeSavepointStatus(ctx);
            this.savepointObserver.observeCheckpointStatus(ctx);
        }
    }

    @Override
    protected boolean checkIfAlreadyUpgraded(FlinkResourceContext<FlinkSessionJob> ctx) {
        Collection<JobStatusMessage> jobStatusMessages;
        FlinkSessionJob flinkSessionJob = ctx.getResource();
        try {
            jobStatusMessages = ctx.getFlinkService().listJobs(ctx.getObserveConfig());
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to list jobs", e);
        }
        String submittedJobId = ((FlinkSessionJobStatus)flinkSessionJob.getStatus()).getJobStatus().getJobId();
        for (JobStatusMessage jobStatusMessage : jobStatusMessages) {
            if (!jobStatusMessage.getJobId().toHexString().equals(submittedJobId)) continue;
            LOG.info("Job with id {} is already deployed.", (Object)submittedJobId);
            return true;
        }
        return false;
    }

    private static class SessionJobStatusObserver
    extends JobStatusObserver<FlinkSessionJob> {
        public SessionJobStatusObserver(EventRecorder eventRecorder) {
            super(eventRecorder);
        }

        @Override
        protected void onTimeout(FlinkResourceContext<FlinkSessionJob> ctx) {
        }

        @Override
        protected Optional<JobStatusMessage> filterTargetJob(org.apache.flink.kubernetes.operator.api.status.JobStatus status, List<JobStatusMessage> clusterJobStatuses) {
            String jobId = (String)Preconditions.checkNotNull((Object)status.getJobId(), (String)"The jobID to be observed should not be null");
            List matchedList = clusterJobStatuses.stream().filter(job -> job.getJobId().toHexString().equals(jobId)).collect(Collectors.toList());
            Preconditions.checkArgument((matchedList.size() <= 1 ? 1 : 0) != 0, (Object)String.format("Expected one job for JobID: %s, but found %d", status.getJobId(), matchedList.size()));
            if (matchedList.size() == 0) {
                LOG.warn("No job found for JobID: {}", (Object)jobId);
                return Optional.empty();
            }
            return Optional.of((JobStatusMessage)matchedList.get(0));
        }

        @Override
        protected void onTargetJobNotFound(FlinkResourceContext<FlinkSessionJob> ctx) {
            this.ifHaDisabledMarkSessionJobMissing(ctx);
        }

        @Override
        protected void onNoJobsFound(FlinkResourceContext<FlinkSessionJob> ctx) {
            this.ifHaDisabledMarkSessionJobMissing(ctx);
        }

        private void ifHaDisabledMarkSessionJobMissing(FlinkResourceContext<FlinkSessionJob> ctx) {
            FlinkSessionJob sessionJob = ctx.getResource();
            if (HighAvailabilityMode.isHighAvailabilityModeActivated((Configuration)ctx.getObserveConfig())) {
                return;
            }
            ((FlinkSessionJobStatus)sessionJob.getStatus()).getJobStatus().setState(JobStatus.RECONCILING.name());
            LOG.error("Missing Session Job");
            ReconciliationUtils.updateForReconciliationError(ctx, new MissingSessionJobException("Missing Session Job"));
            this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>)sessionJob, EventRecorder.Type.Warning, EventRecorder.Reason.Missing, EventRecorder.Component.Job, "Missing Session Job", ctx.getKubernetesClient());
        }
    }
}

