/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.service;

import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
import io.fabric8.kubernetes.api.model.apps.DeploymentFluent;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.EditReplacePatchable;
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.dsl.RollableScalableResource;
import io.fabric8.kubernetes.client.dsl.Waitable;
import io.fabric8.kubernetes.client.dsl.base.PatchContext;
import io.fabric8.kubernetes.client.dsl.base.PatchType;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterClientServiceLoader;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.artifact.ArtifactManager;
import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.AbstractFlinkService;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.service.SuspendMode;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody;
import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsHeaders;
import org.apache.flink.runtime.rest.messages.job.JobResourcesRequirementsUpdateHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NativeFlinkService
extends AbstractFlinkService {
    private static final Logger LOG = LoggerFactory.getLogger(NativeFlinkService.class);
    private static final Deployment SCALE_TO_ZERO = ((DeploymentBuilder)((DeploymentFluent.SpecNested)new DeploymentBuilder().editOrNewSpec().withReplicas(Integer.valueOf(0))).endSpec()).build();
    private static final Duration JM_SHUTDOWN_MAX_WAIT = Duration.ofMinutes(1L);
    private final EventRecorder eventRecorder;

    public NativeFlinkService(KubernetesClient kubernetesClient, ArtifactManager artifactManager, ExecutorService executorService, FlinkOperatorConfiguration operatorConfig, EventRecorder eventRecorder) {
        super(kubernetesClient, artifactManager, executorService, operatorConfig);
        this.eventRecorder = eventRecorder;
    }

    @Override
    protected void deployApplicationCluster(JobSpec jobSpec, Configuration conf) throws Exception {
        LOG.info("Deploying application cluster");
        DefaultClusterClientServiceLoader clusterClientServiceLoader = new DefaultClusterClientServiceLoader();
        ApplicationClusterDeployer deployer = new ApplicationClusterDeployer((ClusterClientServiceLoader)clusterClientServiceLoader);
        ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(jobSpec.getArgs() != null ? jobSpec.getArgs() : new String[]{}, jobSpec.getEntryClass());
        deployer.run(conf, applicationConfiguration);
        LOG.info("Application cluster successfully deployed");
    }

    @Override
    public void deploySessionCluster(Configuration conf) throws Exception {
        this.submitClusterInternal(NativeFlinkService.removeOperatorConfigs(conf));
    }

    @Override
    public FlinkService.CancelResult cancelJob(FlinkDeployment deployment, SuspendMode suspendMode, Configuration configuration) throws Exception {
        return this.cancelJob(deployment, suspendMode, configuration, false);
    }

    @Override
    protected PodList getJmPodList(String namespace, String clusterId) {
        return (PodList)((FilterWatchListDeletable)((NonNamespaceOperation)this.kubernetesClient.pods().inNamespace(namespace)).withLabels(KubernetesUtils.getJobManagerSelectors((String)clusterId))).list();
    }

    protected void submitClusterInternal(Configuration conf) throws Exception {
        LOG.info("Deploying session cluster");
        DefaultClusterClientServiceLoader clusterClientServiceLoader = new DefaultClusterClientServiceLoader();
        ClusterClientFactory kubernetesClusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(conf);
        try (ClusterDescriptor kubernetesClusterDescriptor = kubernetesClusterClientFactory.createClusterDescriptor(conf);){
            kubernetesClusterDescriptor.deploySessionCluster(kubernetesClusterClientFactory.getClusterSpecification(conf));
        }
        LOG.info("Session cluster successfully deployed");
    }

    @Override
    protected void deleteClusterInternal(String namespace, String clusterId, Configuration conf, DeletionPropagation deletionPropagation) {
        RollableScalableResource jmDeployment = (RollableScalableResource)((NonNamespaceOperation)this.kubernetesClient.apps().deployments().inNamespace(namespace)).withName(KubernetesUtils.getDeploymentName((String)clusterId));
        Duration remainingTimeout = this.operatorConfig.getFlinkShutdownClusterTimeout();
        if (deletionPropagation == DeletionPropagation.FOREGROUND) {
            remainingTimeout = this.shutdownJobManagersBlocking((EditReplacePatchable<Deployment>)jmDeployment, namespace, clusterId, remainingTimeout);
        }
        this.deleteDeploymentBlocking("JobManager", (Resource<Deployment>)jmDeployment, deletionPropagation, remainingTimeout);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public boolean scale(FlinkResourceContext<?> ctx, Configuration deployConfig) throws Exception {
        Configuration observeConfig;
        Object resource = ctx.getResource();
        if (!NativeFlinkService.supportsInPlaceScaling(resource, observeConfig = ctx.getObserveConfig())) {
            return false;
        }
        Map newOverrides = (Map)deployConfig.get(PipelineOptions.PARALLELISM_OVERRIDES);
        Map previousOverrides = (Map)observeConfig.get(PipelineOptions.PARALLELISM_OVERRIDES);
        if (newOverrides.isEmpty() && previousOverrides.isEmpty()) {
            LOG.info("No overrides defined before or after. Cannot scale in-place.");
            return false;
        }
        try (RestClusterClient<String> client = this.getClusterClient(observeConfig);){
            HashMap<JobVertexID, JobVertexResourceRequirements> requirements = new HashMap<JobVertexID, JobVertexResourceRequirements>(this.getVertexResources(client, (AbstractFlinkResource<?, ?>)resource));
            boolean alreadyScaled = true;
            for (Map.Entry<JobVertexID, JobVertexResourceRequirements> entry : requirements.entrySet()) {
                String jobId = entry.getKey().toString();
                JobVertexResourceRequirements.Parallelism parallelism = entry.getValue().getParallelism();
                String overrideStr = (String)newOverrides.get(jobId);
                if (overrideStr != null) {
                    int upperBound = Integer.parseInt(overrideStr);
                    int lowerBound = Math.min(upperBound, parallelism.getLowerBound());
                    JobVertexResourceRequirements.Parallelism newParallelism = new JobVertexResourceRequirements.Parallelism(lowerBound, upperBound);
                    if (parallelism.equals((Object)newParallelism)) continue;
                    entry.setValue(new JobVertexResourceRequirements(newParallelism));
                    alreadyScaled = false;
                    continue;
                }
                if (!previousOverrides.containsKey(jobId)) continue;
                LOG.info("Parallelism override for {} has been removed, falling back to regular upgrade.", (Object)jobId);
                boolean bl = false;
                return bl;
            }
            if (alreadyScaled) {
                LOG.info("Vertex resources requirements already match target, nothing to do...");
            } else {
                this.updateVertexResources(client, (AbstractFlinkResource<?, ?>)resource, (Map<JobVertexID, JobVertexResourceRequirements>)requirements);
                this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>)resource, EventRecorder.Type.Normal, EventRecorder.Reason.Scaling, EventRecorder.Component.Job, "In-place scaling triggered", ctx.getKubernetesClient());
            }
            boolean bl = true;
            return bl;
        }
        catch (Throwable t) {
            LOG.error("Error while rescaling, falling back to regular upgrade", t);
            return false;
        }
    }

    private static boolean supportsInPlaceScaling(AbstractFlinkResource<?, ?> resource, Configuration observeConfig) {
        if (((AbstractFlinkSpec)resource.getSpec()).getJob() == null || !((Boolean)observeConfig.get(KubernetesOperatorConfigOptions.JOB_UPGRADE_INPLACE_SCALING_ENABLED)).booleanValue()) {
            return false;
        }
        if (!((FlinkVersion)observeConfig.get(FlinkConfigBuilder.FLINK_VERSION)).isEqualOrNewer(FlinkVersion.v1_18)) {
            LOG.debug("In-place rescaling is only available starting from Flink 1.18");
            return false;
        }
        if (!((JobManagerOptions.SchedulerType)observeConfig.get(JobManagerOptions.SCHEDULER)).equals((Object)JobManagerOptions.SchedulerType.Adaptive)) {
            LOG.debug("In-place rescaling is only available with the adaptive scheduler");
            return false;
        }
        CommonStatus status = (CommonStatus)resource.getStatus();
        if (ReconciliationUtils.isJobInTerminalState(status) || JobStatus.RECONCILING.equals((Object)status.getJobStatus().getState())) {
            LOG.info("Job in terminal or reconciling state cannot be scaled in-place");
            return false;
        }
        return true;
    }

    @VisibleForTesting
    protected void updateVertexResources(RestClusterClient<String> client, AbstractFlinkResource<?, ?> resource, Map<JobVertexID, JobVertexResourceRequirements> newReqs) throws Exception {
        JobMessageParameters jobParameters = new JobMessageParameters();
        jobParameters.jobPathParameter.resolve((Object)JobID.fromHexString((String)((CommonStatus)resource.getStatus()).getJobStatus().getJobId()));
        JobResourceRequirementsBody requestBody = new JobResourceRequirementsBody(new JobResourceRequirements(newReqs));
        client.sendRequest((MessageHeaders)new JobResourcesRequirementsUpdateHeaders(), (MessageParameters)jobParameters, (RequestBody)requestBody).get(this.operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
    }

    @VisibleForTesting
    protected Map<JobVertexID, JobVertexResourceRequirements> getVertexResources(RestClusterClient<String> client, AbstractFlinkResource<?, ?> resource) throws Exception {
        JobMessageParameters jobParameters = new JobMessageParameters();
        jobParameters.jobPathParameter.resolve((Object)JobID.fromHexString((String)((CommonStatus)resource.getStatus()).getJobStatus().getJobId()));
        JobResourceRequirementsBody currentRequirements = (JobResourceRequirementsBody)client.sendRequest((MessageHeaders)new JobResourceRequirementsHeaders(), (MessageParameters)jobParameters, (RequestBody)EmptyRequestBody.getInstance()).get(this.operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
        return ((JobResourceRequirements)currentRequirements.asJobResourceRequirements().get()).getJobVertexParallelisms();
    }

    private Duration shutdownJobManagersBlocking(EditReplacePatchable<Deployment> jmDeployment, String namespace, String clusterId, Duration remainingTimeout) {
        Duration jmShutdownTimeout = (Duration)ObjectUtils.min((Comparable[])new Duration[]{JM_SHUTDOWN_MAX_WAIT, remainingTimeout.dividedBy(2L)});
        Duration remaining = NativeFlinkService.deleteBlocking("Scaling JobManager Deployment to zero", () -> {
            try {
                jmDeployment.patch(PatchContext.of((PatchType)PatchType.JSON_MERGE), (Object)SCALE_TO_ZERO);
            }
            catch (Exception ignore) {
                return null;
            }
            return (Waitable)((NonNamespaceOperation)this.kubernetesClient.pods().inNamespace(namespace)).withLabels(KubernetesUtils.getJobManagerSelectors((String)clusterId));
        }, jmShutdownTimeout);
        return remainingTimeout.minus(jmShutdownTimeout).plus(remaining);
    }
}

