/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.reconciler.sessionjob;

import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.autoscaler.JobAutoScaler;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.FlinkJobTerminatedWithoutCancellationException;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SessionJobReconciler
extends AbstractJobReconciler<FlinkSessionJob, FlinkSessionJobSpec, FlinkSessionJobStatus> {
    private static final Logger LOG = LoggerFactory.getLogger(SessionJobReconciler.class);

    public SessionJobReconciler(EventRecorder eventRecorder, StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus> statusRecorder, JobAutoScaler<ResourceID, KubernetesJobAutoScalerContext> autoscaler) {
        super(eventRecorder, statusRecorder, autoscaler);
    }

    @Override
    public boolean readyToReconcile(FlinkResourceContext<FlinkSessionJob> ctx) {
        return SessionJobReconciler.sessionClusterReady(ctx.getJosdkContext().getSecondaryResource(FlinkDeployment.class)) && super.readyToReconcile(ctx);
    }

    @Override
    public void deploy(FlinkResourceContext<FlinkSessionJob> ctx, FlinkSessionJobSpec sessionJobSpec, Configuration deployConfig, Optional<String> savepoint, boolean requireHaMetadata) throws Exception {
        this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>)ctx.getResource(), EventRecorder.Type.Normal, EventRecorder.Reason.Submit, EventRecorder.Component.Job, "Starting deployment", ctx.getKubernetesClient());
        JobID jobId = JobID.generate();
        ((FlinkSessionJobStatus)ctx.getResource().getStatus()).getJobStatus().setJobId(jobId.toHexString());
        this.statusRecorder.patchAndCacheStatus(ctx.getResource(), ctx.getKubernetesClient());
        ctx.getFlinkService().submitJobToSessionCluster(ctx.getResource().getMetadata(), sessionJobSpec, jobId, deployConfig, savepoint.orElse(null));
        FlinkSessionJobStatus status = (FlinkSessionJobStatus)ctx.getResource().getStatus();
        status.getJobStatus().setState(JobStatus.RECONCILING.name());
    }

    @Override
    protected void cancelJob(FlinkResourceContext<FlinkSessionJob> ctx, UpgradeMode upgradeMode) throws Exception {
        ctx.getFlinkService().cancelSessionJob(ctx.getResource(), upgradeMode, ctx.getObserveConfig());
        ((FlinkSessionJobStatus)ctx.getResource().getStatus()).getJobStatus().setJobId(null);
    }

    @Override
    protected void cleanupAfterFailedJob(FlinkResourceContext<FlinkSessionJob> ctx) {
    }

    @Override
    public DeleteControl cleanupInternal(FlinkResourceContext<FlinkSessionJob> ctx) {
        Optional flinkDepOptional = ctx.getJosdkContext().getSecondaryResource(FlinkDeployment.class);
        if (flinkDepOptional.isPresent()) {
            String jobID = ((FlinkSessionJobStatus)ctx.getResource().getStatus()).getJobStatus().getJobId();
            if (jobID != null) {
                try {
                    Configuration observeConfig = ctx.getObserveConfig();
                    UpgradeMode upgradeMode = observeConfig.getBoolean(KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION) ? UpgradeMode.SAVEPOINT : UpgradeMode.STATELESS;
                    this.cancelJob(ctx, upgradeMode);
                }
                catch (ExecutionException e) {
                    if (ExceptionUtils.findThrowable((Throwable)e, FlinkJobNotFoundException.class).isPresent()) {
                        LOG.error("Job {} not found in the Flink cluster.", (Object)jobID, (Object)e);
                        return DeleteControl.defaultDelete();
                    }
                    if (ExceptionUtils.findThrowable((Throwable)e, FlinkJobTerminatedWithoutCancellationException.class).isPresent()) {
                        LOG.error("Job {} already terminated without cancellation.", (Object)jobID, (Object)e);
                        return DeleteControl.defaultDelete();
                    }
                    long delay = ctx.getOperatorConfig().getProgressCheckInterval().toMillis();
                    LOG.error("Failed to cancel job {}, will reschedule after {} milliseconds.", new Object[]{jobID, delay, e});
                    return DeleteControl.noFinalizerRemoval().rescheduleAfter(delay);
                }
                catch (Exception e) {
                    LOG.error("Failed to cancel job {}.", (Object)jobID, (Object)e);
                }
            }
        } else {
            LOG.info("Session cluster deployment not available");
        }
        return DeleteControl.defaultDelete();
    }

    public static boolean sessionClusterReady(Optional<FlinkDeployment> flinkDeploymentOpt) {
        if (flinkDeploymentOpt.isPresent()) {
            FlinkDeployment flinkdep = flinkDeploymentOpt.get();
            JobManagerDeploymentStatus jobmanagerDeploymentStatus = ((FlinkDeploymentStatus)flinkdep.getStatus()).getJobManagerDeploymentStatus();
            if (jobmanagerDeploymentStatus != JobManagerDeploymentStatus.READY) {
                LOG.info("Session cluster deployment is in {} status, not ready for serve", (Object)jobmanagerDeploymentStatus);
                return false;
            }
            return true;
        }
        LOG.warn("Session cluster deployment is not found");
        return false;
    }
}

