/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.observer.sessionjob;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver;
import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
import org.apache.flink.kubernetes.operator.observer.SnapshotObserver;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkSessionJobObserver
extends AbstractFlinkResourceObserver<FlinkSessionJob> {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkSessionJobObserver.class);
    private final JobStatusObserver<FlinkSessionJob> jobStatusObserver;
    private final SnapshotObserver<FlinkSessionJob, FlinkSessionJobStatus> savepointObserver;

    public FlinkSessionJobObserver(EventRecorder eventRecorder) {
        super(eventRecorder);
        this.jobStatusObserver = new JobStatusObserver(eventRecorder);
        this.savepointObserver = new SnapshotObserver(eventRecorder);
    }

    @Override
    protected boolean isResourceReadyToBeObserved(FlinkResourceContext<FlinkSessionJob> ctx) {
        return super.isResourceReadyToBeObserved(ctx) && ctx.getFlinkService() != null;
    }

    @Override
    protected void observeInternal(FlinkResourceContext<FlinkSessionJob> ctx) {
        boolean jobFound = this.jobStatusObserver.observe(ctx);
        if (jobFound) {
            this.savepointObserver.observeSavepointStatus(ctx);
            this.savepointObserver.observeCheckpointStatus(ctx);
        }
    }

    @Override
    protected boolean checkIfAlreadyUpgraded(FlinkResourceContext<FlinkSessionJob> ctx) {
        FlinkSessionJob flinkSessionJob = ctx.getResource();
        try {
            org.apache.flink.kubernetes.operator.api.status.JobStatus jobStatus = ((FlinkSessionJobStatus)flinkSessionJob.getStatus()).getJobStatus();
            if (jobStatus.getJobId() == null) {
                return false;
            }
            JobID jobId = JobID.fromHexString((String)jobStatus.getJobId());
            Boolean deployed = ctx.getFlinkService().getJobStatus(ctx.getObserveConfig(), jobId).map(jsm -> !JobStatus.CANCELLING.equals((Object)jsm.getJobState()) && !JobStatus.CANCELED.equals((Object)jsm.getJobState())).orElse(false);
            if (deployed.booleanValue()) {
                LOG.info("Job with id {} is already deployed.", (Object)jobId);
                return true;
            }
            return false;
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to list jobs", e);
        }
    }
}

