/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.reconciler.deployment;

import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.autoscaler.JobAutoScaler;
import org.apache.flink.autoscaler.NoopJobAutoscaler;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.diff.DiffType;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SessionReconciler
extends AbstractFlinkResourceReconciler<FlinkDeployment, FlinkDeploymentSpec, FlinkDeploymentStatus> {
    private static final Logger LOG = LoggerFactory.getLogger(SessionReconciler.class);

    public SessionReconciler(EventRecorder eventRecorder, StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder) {
        super(eventRecorder, statusRecorder, (JobAutoScaler<ResourceID, KubernetesJobAutoScalerContext>)new NoopJobAutoscaler());
    }

    @Override
    protected boolean readyToReconcile(FlinkResourceContext<FlinkDeployment> ctx) {
        return true;
    }

    @Override
    protected boolean reconcileSpecChange(DiffType diffType, FlinkResourceContext<FlinkDeployment> ctx, Configuration deployConfig, FlinkDeploymentSpec lastReconciledSpec) throws Exception {
        FlinkDeployment deployment = ctx.getResource();
        this.deleteSessionCluster(ctx);
        ReconciliationUtils.updateStatusBeforeDeploymentAttempt(deployment, deployConfig, this.clock);
        this.statusRecorder.patchAndCacheStatus(deployment, ctx.getKubernetesClient());
        this.deploy(ctx, (FlinkDeploymentSpec)deployment.getSpec(), deployConfig, Optional.empty(), false);
        ReconciliationUtils.updateStatusForDeployedSpec(deployment, deployConfig, this.clock);
        return true;
    }

    private void deleteSessionCluster(FlinkResourceContext<FlinkDeployment> ctx) {
        FlinkDeployment deployment = ctx.getResource();
        Configuration conf = ctx.getDeployConfig((AbstractFlinkSpec)ctx.getResource().getSpec());
        ctx.getFlinkService().deleteClusterDeployment(deployment.getMetadata(), (FlinkDeploymentStatus)deployment.getStatus(), conf, false);
    }

    @Override
    public void deploy(FlinkResourceContext<FlinkDeployment> ctx, FlinkDeploymentSpec spec, Configuration deployConfig, Optional<String> savepoint, boolean requireHaMetadata) throws Exception {
        FlinkDeployment cr = ctx.getResource();
        this.setOwnerReference(cr, deployConfig);
        ctx.getFlinkService().submitSessionCluster(deployConfig);
        ((FlinkDeploymentStatus)cr.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
        IngressUtils.updateIngressRules(cr.getMetadata(), spec, deployConfig, ctx.getKubernetesClient());
    }

    @Override
    public boolean reconcileOtherChanges(FlinkResourceContext<FlinkDeployment> ctx) throws Exception {
        if (this.shouldRecoverDeployment(ctx.getObserveConfig(), ctx.getResource())) {
            this.recoverSession(ctx);
            return true;
        }
        return false;
    }

    private void recoverSession(FlinkResourceContext<FlinkDeployment> ctx) throws Exception {
        ctx.getFlinkService().submitSessionCluster(ctx.getObserveConfig());
        ((FlinkDeploymentStatus)ctx.getResource().getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
    }

    @Override
    public DeleteControl cleanupInternal(FlinkResourceContext<FlinkDeployment> ctx) {
        Set sessionJobs = ctx.getJosdkContext().getSecondaryResources(FlinkSessionJob.class);
        FlinkDeployment deployment = ctx.getResource();
        if (!sessionJobs.isEmpty()) {
            String error = String.format("The session jobs %s should be deleted first", sessionJobs.stream().map(job -> job.getMetadata().getName()).collect(Collectors.toList()));
            if (this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>)deployment, EventRecorder.Type.Warning, EventRecorder.Reason.CleanupFailed, EventRecorder.Component.Operator, error, ctx.getKubernetesClient())) {
                LOG.warn(error);
            }
            return DeleteControl.noFinalizerRemoval().rescheduleAfter(ctx.getOperatorConfig().getReconcileInterval().toMillis());
        }
        LOG.info("Stopping session cluster");
        Configuration conf = ctx.getDeployConfig((AbstractFlinkSpec)ctx.getResource().getSpec());
        ctx.getFlinkService().deleteClusterDeployment(deployment.getMetadata(), (FlinkDeploymentStatus)deployment.getStatus(), conf, true);
        return DeleteControl.defaultDelete();
    }
}

