/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.reconciler.sessionjob;

import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import java.util.Optional;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.autoscaler.JobAutoScaler;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.service.SuspendMode;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SessionJobReconciler
extends AbstractJobReconciler<FlinkSessionJob, FlinkSessionJobSpec, FlinkSessionJobStatus> {
    private static final Logger LOG = LoggerFactory.getLogger(SessionJobReconciler.class);

    public SessionJobReconciler(EventRecorder eventRecorder, StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus> statusRecorder, JobAutoScaler<ResourceID, KubernetesJobAutoScalerContext> autoscaler) {
        super(eventRecorder, statusRecorder, autoscaler);
    }

    @Override
    public boolean readyToReconcile(FlinkResourceContext<FlinkSessionJob> ctx) {
        return SessionJobReconciler.sessionClusterReady(ctx.getJosdkContext().getSecondaryResource(FlinkDeployment.class)) && super.readyToReconcile(ctx);
    }

    @Override
    public void deploy(FlinkResourceContext<FlinkSessionJob> ctx, FlinkSessionJobSpec sessionJobSpec, Configuration deployConfig, Optional<String> savepoint, boolean requireHaMetadata) throws Exception {
        this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>)ctx.getResource(), EventRecorder.Type.Normal, EventRecorder.Reason.Submit, EventRecorder.Component.Job, "Starting deployment", ctx.getKubernetesClient());
        JobID jobId = JobID.generate();
        ((FlinkSessionJobStatus)ctx.getResource().getStatus()).getJobStatus().setJobId(jobId.toHexString());
        this.statusRecorder.patchAndCacheStatus(ctx.getResource(), ctx.getKubernetesClient());
        ctx.getFlinkService().submitJobToSessionCluster(ctx.getResource().getMetadata(), sessionJobSpec, jobId, deployConfig, savepoint.orElse(null));
        FlinkSessionJobStatus status = (FlinkSessionJobStatus)ctx.getResource().getStatus();
        status.getJobStatus().setState(JobStatus.RECONCILING);
    }

    @Override
    protected boolean cancelJob(FlinkResourceContext<FlinkSessionJob> ctx, SuspendMode suspendMode) throws Exception {
        FlinkService.CancelResult result = ctx.getFlinkService().cancelSessionJob(ctx.getResource(), suspendMode, ctx.getObserveConfig());
        result.getSavepointPath().ifPresent(location -> this.setUpgradeSavepointPath(ctx, (String)location));
        return result.isPending();
    }

    @Override
    protected void cleanupAfterFailedJob(FlinkResourceContext<FlinkSessionJob> ctx) {
    }

    @Override
    public DeleteControl cleanupInternal(FlinkResourceContext<FlinkSessionJob> ctx) {
        FlinkSessionJobStatus status = (FlinkSessionJobStatus)ctx.getResource().getStatus();
        long delay = ctx.getOperatorConfig().getProgressCheckInterval().toMillis();
        if (status.getReconciliationStatus().isBeforeFirstDeployment() || ReconciliationUtils.isJobInTerminalState(status) || ((FlinkSessionJobSpec)status.getReconciliationStatus().deserializeLastReconciledSpec()).getJob().getState() == JobState.SUSPENDED || "Job Not Found".equals(status.getError())) {
            return DeleteControl.defaultDelete();
        }
        if (ReconciliationUtils.isJobCancelling(status)) {
            LOG.info("Waiting for pending cancellation");
            return DeleteControl.noFinalizerRemoval().rescheduleAfter(delay);
        }
        Optional flinkDepOptional = ctx.getJosdkContext().getSecondaryResource(FlinkDeployment.class);
        if (flinkDepOptional.isPresent()) {
            String jobID = ((FlinkSessionJobStatus)ctx.getResource().getStatus()).getJobStatus().getJobId();
            if (jobID != null) {
                try {
                    SuspendMode suspendMode;
                    Configuration observeConfig = ctx.getObserveConfig();
                    SuspendMode suspendMode2 = suspendMode = observeConfig.getBoolean(KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION) ? SuspendMode.SAVEPOINT : SuspendMode.STATELESS;
                    if (this.cancelJob(ctx, suspendMode)) {
                        LOG.info("Waiting for pending cancellation");
                        return DeleteControl.noFinalizerRemoval().rescheduleAfter(delay);
                    }
                }
                catch (Exception e) {
                    LOG.error("Failed to cancel job, will reschedule after {} milliseconds.", (Object)delay, (Object)e);
                    return DeleteControl.noFinalizerRemoval().rescheduleAfter(delay);
                }
            }
        } else {
            LOG.info("Session cluster deployment not available");
        }
        return DeleteControl.defaultDelete();
    }

    public static boolean sessionClusterReady(Optional<FlinkDeployment> flinkDeploymentOpt) {
        if (flinkDeploymentOpt.isPresent()) {
            FlinkDeployment flinkdep = flinkDeploymentOpt.get();
            JobManagerDeploymentStatus jobmanagerDeploymentStatus = ((FlinkDeploymentStatus)flinkdep.getStatus()).getJobManagerDeploymentStatus();
            if (jobmanagerDeploymentStatus != JobManagerDeploymentStatus.READY) {
                LOG.info("Session cluster deployment is in {} status, not ready for serve", (Object)jobmanagerDeploymentStatus);
                return false;
            }
            return true;
        }
        LOG.warn("Session cluster deployment is not found");
        return false;
    }
}

