/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.reconciler.deployment;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.diff.DiffType;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentReconciliationStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SessionReconciler
extends AbstractFlinkResourceReconciler<FlinkDeployment, FlinkDeploymentSpec, FlinkDeploymentStatus> {
    protected final FlinkService flinkService;
    private static final Logger LOG = LoggerFactory.getLogger(SessionReconciler.class);

    public SessionReconciler(KubernetesClient kubernetesClient, FlinkService flinkService, FlinkConfigManager configManager, EventRecorder eventRecorder, StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder) {
        super(kubernetesClient, configManager, eventRecorder, statusRecorder);
        this.flinkService = flinkService;
    }

    @Override
    protected FlinkService getFlinkService(FlinkDeployment resource, Context<?> context) {
        return this.flinkService;
    }

    @Override
    protected Configuration getDeployConfig(ObjectMeta meta, FlinkDeploymentSpec spec, Context<?> ctx) {
        return this.configManager.getDeployConfig(meta, spec);
    }

    @Override
    protected Configuration getObserveConfig(FlinkDeployment resource, Context<?> context) {
        return this.configManager.getObserveConfig(resource);
    }

    @Override
    protected boolean readyToReconcile(FlinkDeployment deployment, Context<?> ctx, Configuration deployConfig) {
        return true;
    }

    @Override
    protected void reconcileSpecChange(FlinkDeployment deployment, Context<?> ctx, Configuration observeConfig, Configuration deployConfig, DiffType type) throws Exception {
        this.deleteSessionCluster(deployment, observeConfig);
        ReconciliationUtils.updateStatusBeforeDeploymentAttempt(deployment, deployConfig);
        this.statusRecorder.patchAndCacheStatus(deployment);
        this.deploy(deployment, (FlinkDeploymentSpec)deployment.getSpec(), (FlinkDeploymentStatus)deployment.getStatus(), ctx, deployConfig, Optional.empty(), false);
        ReconciliationUtils.updateStatusForDeployedSpec(deployment, deployConfig);
    }

    private void deleteSessionCluster(FlinkDeployment deployment, Configuration effectiveConfig) {
        this.flinkService.deleteClusterDeployment(deployment.getMetadata(), (FlinkDeploymentStatus)deployment.getStatus(), false);
        this.flinkService.waitForClusterShutdown(effectiveConfig);
    }

    @Override
    protected void deploy(FlinkDeployment cr, FlinkDeploymentSpec spec, FlinkDeploymentStatus status, Context<?> ctx, Configuration deployConfig, Optional<String> savepoint, boolean requireHaMetadata) throws Exception {
        this.setOwnerReference(cr, deployConfig);
        this.flinkService.submitSessionCluster(deployConfig);
        status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
        IngressUtils.updateIngressRules(cr.getMetadata(), spec, deployConfig, this.kubernetesClient);
    }

    @Override
    protected void rollback(FlinkDeployment deployment, Context<?> ctx, Configuration observeConfig) throws Exception {
        FlinkDeploymentStatus status = (FlinkDeploymentStatus)deployment.getStatus();
        FlinkDeploymentReconciliationStatus reconciliationStatus = status.getReconciliationStatus();
        FlinkDeploymentSpec rollbackSpec = (FlinkDeploymentSpec)reconciliationStatus.deserializeLastStableSpec();
        Configuration rollbackConfig = this.configManager.getDeployConfig(deployment.getMetadata(), rollbackSpec);
        this.deleteSessionCluster(deployment, observeConfig);
        this.deploy(deployment, rollbackSpec, (FlinkDeploymentStatus)deployment.getStatus(), ctx, rollbackConfig, Optional.empty(), false);
        reconciliationStatus.setState(ReconciliationState.ROLLED_BACK);
    }

    @Override
    public boolean reconcileOtherChanges(FlinkDeployment flinkApp, Context<?> ctx, Configuration observeConfig) throws Exception {
        if (this.shouldRecoverDeployment(observeConfig, flinkApp)) {
            this.recoverSession(flinkApp, observeConfig);
            return true;
        }
        return false;
    }

    private void recoverSession(FlinkDeployment deployment, Configuration effectiveConfig) throws Exception {
        this.flinkService.submitSessionCluster(effectiveConfig);
        ((FlinkDeploymentStatus)deployment.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
    }

    @Override
    public DeleteControl cleanupInternal(FlinkDeployment deployment, Context<?> context) {
        Set sessionJobs = context.getSecondaryResources(FlinkSessionJob.class);
        if (!sessionJobs.isEmpty()) {
            String error = String.format("The session jobs %s should be deleted first", sessionJobs.stream().map(job -> job.getMetadata().getName()).collect(Collectors.toList()));
            if (this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>)deployment, EventRecorder.Type.Warning, EventRecorder.Reason.CleanupFailed, EventRecorder.Component.Operator, error)) {
                LOG.warn(error);
            }
            return DeleteControl.noFinalizerRemoval().rescheduleAfter(this.configManager.getOperatorConfiguration().getReconcileInterval().toMillis());
        }
        LOG.info("Stopping session cluster");
        this.flinkService.deleteClusterDeployment(deployment.getMetadata(), (FlinkDeploymentStatus)deployment.getStatus(), true);
        return DeleteControl.defaultDelete();
    }
}

