/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.observer.sessionjob;

import io.javaoperatorsdk.operator.api.reconciler.Context;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.exception.MissingSessionJobException;
import org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver;
import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
import org.apache.flink.kubernetes.operator.observer.SavepointObserver;
import org.apache.flink.kubernetes.operator.observer.sessionjob.FlinkSessionJobObserverContext;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkSessionJobObserver
extends AbstractFlinkResourceObserver<FlinkSessionJob, FlinkSessionJobObserverContext> {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkSessionJobObserver.class);
    private final FlinkServiceFactory flinkServiceFactory;

    public FlinkSessionJobObserver(FlinkServiceFactory flinkServiceFactory, FlinkConfigManager configManager, EventRecorder eventRecorder) {
        super(configManager, eventRecorder);
        this.flinkServiceFactory = flinkServiceFactory;
    }

    @Override
    protected FlinkSessionJobObserverContext getObserverContext(FlinkSessionJob resource, Context<?> context) {
        return new FlinkSessionJobObserverContext(resource, context, this.flinkServiceFactory, this.configManager);
    }

    @Override
    protected boolean isResourceReadyToBeObserved(FlinkSessionJob resource, Context<?> context, FlinkSessionJobObserverContext observerContext) {
        return super.isResourceReadyToBeObserved(resource, context, observerContext) && observerContext.isReadyToReconcile();
    }

    @Override
    protected void observeInternal(FlinkSessionJob flinkSessionJob, Context<?> ctx, FlinkSessionJobObserverContext observerContext) {
        SessionJobStatusObserver jobStatusObserver = new SessionJobStatusObserver(observerContext, this.configManager, this.eventRecorder);
        boolean jobFound = jobStatusObserver.observe(flinkSessionJob, ctx, observerContext);
        if (jobFound) {
            SavepointObserver savepointObserver = new SavepointObserver(observerContext.getFlinkService(), this.configManager, this.eventRecorder);
            savepointObserver.observeSavepointStatus(flinkSessionJob, observerContext.getDeployedConfig());
        }
    }

    @Override
    protected void updateStatusToDeployedIfAlreadyUpgraded(FlinkSessionJob flinkSessionJob, Context<?> ctx, FlinkSessionJobObserverContext observerContext) {
        Collection<JobStatusMessage> jobStatusMessages;
        String uid = flinkSessionJob.getMetadata().getUid();
        try {
            jobStatusMessages = observerContext.getFlinkService().listJobs(observerContext.getDeployedConfig());
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to list jobs", e);
        }
        ArrayList<JobID> matchedJobs = new ArrayList<JobID>();
        for (JobStatusMessage jobStatusMessage : jobStatusMessages) {
            JobID jobId = jobStatusMessage.getJobId();
            if (jobId.getLowerPart() != (long)uid.hashCode() || jobStatusMessage.getJobState().isGloballyTerminalState()) continue;
            matchedJobs.add(jobId);
        }
        if (matchedJobs.isEmpty()) {
            return;
        }
        if (matchedJobs.size() > 1) {
            throw new RuntimeException(String.format("Unexpected case: %d job found for the resource with uid: %s", matchedJobs.size(), flinkSessionJob.getMetadata().getUid()));
        }
        JobID matchedJobID = (JobID)matchedJobs.get(0);
        Long upgradeTargetGeneration = ReconciliationUtils.getUpgradeTargetGeneration(flinkSessionJob);
        long deployedGeneration = matchedJobID.getUpperPart();
        String oldJobID = ((FlinkSessionJobStatus)flinkSessionJob.getStatus()).getJobStatus().getJobId();
        if (upgradeTargetGeneration != deployedGeneration) {
            String msg = String.format("Running job %s's generation %s doesn't match upgrade target generation %s.", matchedJobID.toHexString(), deployedGeneration, upgradeTargetGeneration);
            throw new RuntimeException(msg);
        }
        LOG.info("Pending upgrade is already deployed, updating status. Old jobID:{}, new jobID:{}", (Object)oldJobID, (Object)matchedJobID.toHexString());
        ReconciliationUtils.updateStatusForAlreadyUpgraded(flinkSessionJob);
        ((FlinkSessionJobStatus)flinkSessionJob.getStatus()).getJobStatus().setState(JobStatus.RECONCILING.name());
        ((FlinkSessionJobStatus)flinkSessionJob.getStatus()).getJobStatus().setJobId(matchedJobID.toHexString());
    }

    private static class SessionJobStatusObserver
    extends JobStatusObserver<FlinkSessionJob, FlinkSessionJobObserverContext> {
        public SessionJobStatusObserver(FlinkSessionJobObserverContext observerContext, FlinkConfigManager configManager, EventRecorder eventRecorder) {
            super(observerContext.getFlinkService(), configManager, eventRecorder);
        }

        @Override
        protected void onTimeout(FlinkSessionJob sessionJob, Context<?> ctx, FlinkSessionJobObserverContext sessionJobObserverContext) {
        }

        @Override
        protected Optional<JobStatusMessage> filterTargetJob(org.apache.flink.kubernetes.operator.api.status.JobStatus status, List<JobStatusMessage> clusterJobStatuses) {
            String jobId = (String)Preconditions.checkNotNull((Object)status.getJobId(), (String)"The jobID to be observed should not be null");
            List matchedList = clusterJobStatuses.stream().filter(job -> job.getJobId().toHexString().equals(jobId)).collect(Collectors.toList());
            Preconditions.checkArgument((matchedList.size() <= 1 ? 1 : 0) != 0, (Object)String.format("Expected one job for JobID: %s, but %d founded", status.getJobId(), matchedList.size()));
            if (matchedList.size() == 0) {
                LOG.warn("No job found for JobID: {}", (Object)jobId);
                return Optional.empty();
            }
            return Optional.of((JobStatusMessage)matchedList.get(0));
        }

        @Override
        protected void onTargetJobNotFound(FlinkSessionJob resource, Configuration config) {
            this.ifHaDisabledMarkSessionJobMissing(resource, config);
        }

        @Override
        protected void onNoJobsFound(FlinkSessionJob resource, Configuration config) {
            this.ifHaDisabledMarkSessionJobMissing(resource, config);
        }

        private void ifHaDisabledMarkSessionJobMissing(FlinkSessionJob sessionJob, Configuration conf) {
            if (HighAvailabilityMode.isHighAvailabilityModeActivated((Configuration)conf)) {
                return;
            }
            ((FlinkSessionJobStatus)sessionJob.getStatus()).getJobStatus().setState(JobStatus.RECONCILING.name());
            LOG.error("Missing Session Job");
            ReconciliationUtils.updateForReconciliationError(sessionJob, new MissingSessionJobException("Missing Session Job"), this.configManager.getOperatorConfiguration());
            this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>)sessionJob, EventRecorder.Type.Warning, EventRecorder.Reason.Missing, EventRecorder.Component.Job, "Missing Session Job");
        }
    }
}

