/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.reconciler.sessionjob;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import java.util.Optional;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SessionJobReconciler
extends AbstractJobReconciler<FlinkSessionJob, FlinkSessionJobSpec, FlinkSessionJobStatus> {
    private static final Logger LOG = LoggerFactory.getLogger(SessionJobReconciler.class);
    private final FlinkServiceFactory flinkServiceFactory;

    public SessionJobReconciler(KubernetesClient kubernetesClient, FlinkServiceFactory flinkServiceFactory, FlinkConfigManager configManager, EventRecorder eventRecorder, StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus> statusRecorder) {
        super(kubernetesClient, configManager, eventRecorder, statusRecorder);
        this.flinkServiceFactory = flinkServiceFactory;
    }

    @Override
    protected FlinkService getFlinkService(FlinkSessionJob resource, Context<?> context) {
        Optional deploymentOpt = context.getSecondaryResource(FlinkDeployment.class);
        if (SessionJobReconciler.sessionClusterReady(deploymentOpt)) {
            return this.flinkServiceFactory.getOrCreate((FlinkDeployment)deploymentOpt.get());
        }
        return null;
    }

    @Override
    protected Configuration getObserveConfig(FlinkSessionJob sessionJob, Context<?> context) {
        return this.getDeployConfig(sessionJob.getMetadata(), (FlinkSessionJobSpec)sessionJob.getSpec(), context);
    }

    @Override
    protected Configuration getDeployConfig(ObjectMeta deployMeta, FlinkSessionJobSpec currentDeploySpec, Context<?> context) {
        Optional deploymentOpt = context.getSecondaryResource(FlinkDeployment.class);
        if (!SessionJobReconciler.sessionClusterReady(deploymentOpt)) {
            return null;
        }
        return this.configManager.getSessionJobConfig((FlinkDeployment)deploymentOpt.get(), currentDeploySpec);
    }

    @Override
    public boolean readyToReconcile(FlinkSessionJob flinkSessionJob, Context<?> context, Configuration deployConfig) {
        return SessionJobReconciler.sessionClusterReady(context.getSecondaryResource(FlinkDeployment.class)) && super.readyToReconcile(flinkSessionJob, context, deployConfig);
    }

    @Override
    protected void deploy(FlinkSessionJob cr, FlinkSessionJobSpec sessionJobSpec, FlinkSessionJobStatus status, Context<?> ctx, Configuration deployConfig, Optional<String> savepoint, boolean requireHaMetadata) throws Exception {
        FlinkService flinkService = this.getFlinkService(cr, ctx);
        JobID jobID = flinkService.submitJobToSessionCluster(cr.getMetadata(), sessionJobSpec, deployConfig, savepoint.orElse(null));
        status.setJobStatus(new org.apache.flink.kubernetes.operator.api.status.JobStatus().toBuilder().jobId(jobID.toHexString()).state(JobStatus.RECONCILING.name()).build());
    }

    @Override
    protected void cancelJob(FlinkSessionJob resource, Context<?> ctx, UpgradeMode upgradeMode, Configuration observeConfig) throws Exception {
        FlinkService flinkService = this.getFlinkService(resource, ctx);
        flinkService.cancelSessionJob(resource, upgradeMode, observeConfig);
    }

    @Override
    protected void cleanupAfterFailedJob(FlinkSessionJob resource, Context<?> ctx, Configuration observeConfig) throws Exception {
    }

    @Override
    public DeleteControl cleanupInternal(FlinkSessionJob sessionJob, Context<?> context) {
        Optional flinkDepOptional = context.getSecondaryResource(FlinkDeployment.class);
        if (flinkDepOptional.isPresent()) {
            String jobID = ((FlinkSessionJobStatus)sessionJob.getStatus()).getJobStatus().getJobId();
            if (jobID != null) {
                try {
                    this.cancelJob(sessionJob, context, UpgradeMode.STATELESS, this.getObserveConfig(sessionJob, context));
                }
                catch (Exception e) {
                    LOG.error("Failed to cancel job {}.", (Object)jobID, (Object)e);
                }
            }
        } else {
            LOG.info("Session cluster deployment not available");
        }
        return DeleteControl.defaultDelete();
    }

    public static boolean sessionClusterReady(Optional<FlinkDeployment> flinkDeploymentOpt) {
        if (flinkDeploymentOpt.isPresent()) {
            FlinkDeployment flinkdep = flinkDeploymentOpt.get();
            JobManagerDeploymentStatus jobmanagerDeploymentStatus = ((FlinkDeploymentStatus)flinkdep.getStatus()).getJobManagerDeploymentStatus();
            if (jobmanagerDeploymentStatus != JobManagerDeploymentStatus.READY) {
                LOG.info("Session cluster deployment is in {} status, not ready for serve", (Object)jobmanagerDeploymentStatus);
                return false;
            }
            return true;
        }
        LOG.warn("Session cluster deployment is not found");
        return false;
    }
}

