/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.service;

import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.client.GracePeriodConfigurable;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.RollableScalableResource;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterClientServiceLoader;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.artifact.ArtifactManager;
import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.AbstractFlinkService;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody;
import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsHeaders;
import org.apache.flink.runtime.rest.messages.job.JobResourcesRequirementsUpdateHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NativeFlinkService
extends AbstractFlinkService {
    private static final Logger LOG = LoggerFactory.getLogger(NativeFlinkService.class);
    private final EventRecorder eventRecorder;

    public NativeFlinkService(KubernetesClient kubernetesClient, ArtifactManager artifactManager, ExecutorService executorService, FlinkOperatorConfiguration operatorConfig, EventRecorder eventRecorder) {
        super(kubernetesClient, artifactManager, executorService, operatorConfig);
        this.eventRecorder = eventRecorder;
    }

    @Override
    protected void deployApplicationCluster(JobSpec jobSpec, Configuration conf) throws Exception {
        LOG.info("Deploying application cluster");
        DefaultClusterClientServiceLoader clusterClientServiceLoader = new DefaultClusterClientServiceLoader();
        ApplicationClusterDeployer deployer = new ApplicationClusterDeployer((ClusterClientServiceLoader)clusterClientServiceLoader);
        ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(jobSpec.getArgs() != null ? jobSpec.getArgs() : new String[]{}, jobSpec.getEntryClass());
        deployer.run(conf, applicationConfiguration);
        LOG.info("Application cluster successfully deployed");
    }

    @Override
    public void submitSessionCluster(Configuration conf) throws Exception {
        this.submitClusterInternal(NativeFlinkService.removeOperatorConfigs(conf));
    }

    @Override
    public void cancelJob(FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration configuration) throws Exception {
        this.cancelJob(deployment, upgradeMode, configuration, false);
    }

    @Override
    protected PodList getJmPodList(String namespace, String clusterId) {
        return (PodList)((FilterWatchListDeletable)((NonNamespaceOperation)this.kubernetesClient.pods().inNamespace(namespace)).withLabels(KubernetesUtils.getJobManagerSelectors((String)clusterId))).list();
    }

    @Override
    protected PodList getTmPodList(String namespace, String clusterId) {
        return new PodList();
    }

    protected void submitClusterInternal(Configuration conf) throws Exception {
        LOG.info("Deploying session cluster");
        DefaultClusterClientServiceLoader clusterClientServiceLoader = new DefaultClusterClientServiceLoader();
        ClusterClientFactory kubernetesClusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(conf);
        try (ClusterDescriptor kubernetesClusterDescriptor = kubernetesClusterClientFactory.createClusterDescriptor(conf);){
            kubernetesClusterDescriptor.deploySessionCluster(kubernetesClusterClientFactory.getClusterSpecification(conf));
        }
        LOG.info("Session cluster successfully deployed");
    }

    @Override
    protected void deleteClusterInternal(ObjectMeta meta, Configuration conf, boolean deleteHaData, DeletionPropagation deletionPropagation) {
        String namespace = meta.getNamespace();
        String clusterId = meta.getName();
        LOG.info("Deleting JobManager deployment {}.", (Object)(deleteHaData ? "and HA metadata" : "while preserving HA metadata"));
        ((GracePeriodConfigurable)((RollableScalableResource)((NonNamespaceOperation)this.kubernetesClient.apps().deployments().inNamespace(namespace)).withName(KubernetesUtils.getDeploymentName((String)clusterId))).withPropagationPolicy(deletionPropagation)).delete();
        if (deleteHaData) {
            this.deleteHAData(namespace, clusterId, conf);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public FlinkService.ScalingResult scale(FlinkResourceContext<?> ctx, Configuration deployConfig) throws Exception {
        Configuration observeConfig;
        Object resource = ctx.getResource();
        if (!NativeFlinkService.supportsInPlaceScaling(resource, observeConfig = ctx.getObserveConfig())) {
            return FlinkService.ScalingResult.CANNOT_SCALE;
        }
        Map newOverrides = (Map)deployConfig.get(PipelineOptions.PARALLELISM_OVERRIDES);
        Map previousOverrides = (Map)observeConfig.get(PipelineOptions.PARALLELISM_OVERRIDES);
        if (newOverrides.isEmpty() && previousOverrides.isEmpty()) {
            LOG.info("No overrides defined before or after. Cannot scale in-place.");
            return FlinkService.ScalingResult.CANNOT_SCALE;
        }
        try (RestClusterClient<String> client = this.getClusterClient(observeConfig);){
            HashMap<JobVertexID, JobVertexResourceRequirements> requirements = new HashMap<JobVertexID, JobVertexResourceRequirements>(this.getVertexResources(client, (AbstractFlinkResource<?, ?>)resource));
            FlinkService.ScalingResult result = FlinkService.ScalingResult.ALREADY_SCALED;
            for (Map.Entry<JobVertexID, JobVertexResourceRequirements> entry : requirements.entrySet()) {
                String jobId = entry.getKey().toString();
                JobVertexResourceRequirements.Parallelism parallelism = entry.getValue().getParallelism();
                String overrideStr = (String)newOverrides.get(jobId);
                if (overrideStr != null) {
                    int p = Integer.parseInt(overrideStr);
                    JobVertexResourceRequirements.Parallelism newParallelism = new JobVertexResourceRequirements.Parallelism(p, p);
                    if (parallelism.equals((Object)newParallelism)) continue;
                    entry.setValue(new JobVertexResourceRequirements(newParallelism));
                    result = FlinkService.ScalingResult.SCALING_TRIGGERED;
                    continue;
                }
                if (!previousOverrides.containsKey(jobId)) continue;
                LOG.info("Parallelism override for {} has been removed, falling back to regular upgrade.", (Object)jobId);
                FlinkService.ScalingResult scalingResult = FlinkService.ScalingResult.CANNOT_SCALE;
                return scalingResult;
            }
            if (result == FlinkService.ScalingResult.ALREADY_SCALED) {
                LOG.info("Vertex resources requirements already match target, nothing to do...");
            } else {
                this.updateVertexResources(client, (AbstractFlinkResource<?, ?>)resource, (Map<JobVertexID, JobVertexResourceRequirements>)requirements);
                this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>)resource, EventRecorder.Type.Normal, EventRecorder.Reason.Scaling, EventRecorder.Component.Job, "In-place scaling triggered", ctx.getKubernetesClient());
            }
            Object object = result;
            return object;
        }
        catch (Throwable t) {
            LOG.error("Error while rescaling, falling back to regular upgrade", t);
            return FlinkService.ScalingResult.CANNOT_SCALE;
        }
    }

    private static boolean supportsInPlaceScaling(AbstractFlinkResource<?, ?> resource, Configuration observeConfig) {
        if (((AbstractFlinkSpec)resource.getSpec()).getJob() == null || !((Boolean)observeConfig.get(KubernetesOperatorConfigOptions.JOB_UPGRADE_INPLACE_SCALING_ENABLED)).booleanValue()) {
            return false;
        }
        if (!((FlinkVersion)observeConfig.get(FlinkConfigBuilder.FLINK_VERSION)).isNewerVersionThan(FlinkVersion.v1_17)) {
            LOG.debug("In-place rescaling is only available starting from Flink 1.18");
            return false;
        }
        if (!((JobManagerOptions.SchedulerType)observeConfig.get(JobManagerOptions.SCHEDULER)).equals((Object)JobManagerOptions.SchedulerType.Adaptive)) {
            LOG.debug("In-place rescaling is only available with the adaptive scheduler");
            return false;
        }
        CommonStatus status = (CommonStatus)resource.getStatus();
        if (ReconciliationUtils.isJobInTerminalState(status) || JobStatus.RECONCILING.name().equals(status.getJobStatus().getState())) {
            LOG.info("Job in terminal or reconciling state cannot be scaled in-place");
            return false;
        }
        return true;
    }

    @VisibleForTesting
    protected void updateVertexResources(RestClusterClient<String> client, AbstractFlinkResource<?, ?> resource, Map<JobVertexID, JobVertexResourceRequirements> newReqs) throws Exception {
        JobMessageParameters jobParameters = new JobMessageParameters();
        jobParameters.jobPathParameter.resolve((Object)JobID.fromHexString((String)((CommonStatus)resource.getStatus()).getJobStatus().getJobId()));
        JobResourceRequirementsBody requestBody = new JobResourceRequirementsBody(new JobResourceRequirements(newReqs));
        client.sendRequest((MessageHeaders)new JobResourcesRequirementsUpdateHeaders(), (MessageParameters)jobParameters, (RequestBody)requestBody).get(this.operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
    }

    @VisibleForTesting
    protected Map<JobVertexID, JobVertexResourceRequirements> getVertexResources(RestClusterClient<String> client, AbstractFlinkResource<?, ?> resource) throws Exception {
        JobMessageParameters jobParameters = new JobMessageParameters();
        jobParameters.jobPathParameter.resolve((Object)JobID.fromHexString((String)((CommonStatus)resource.getStatus()).getJobStatus().getJobId()));
        JobResourceRequirementsBody currentRequirements = (JobResourceRequirementsBody)client.sendRequest((MessageHeaders)new JobResourceRequirementsHeaders(), (MessageParameters)jobParameters, (RequestBody)EmptyRequestBody.getInstance()).get(this.operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
        return ((JobResourceRequirements)currentRequirements.asJobResourceRequirements().get()).getJobVertexParallelisms();
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public boolean scalingCompleted(FlinkResourceContext<?> ctx) {
        Configuration conf = ctx.getObserveConfig();
        CommonStatus status = (CommonStatus)ctx.getResource().getStatus();
        try (RestClusterClient<String> client = ctx.getFlinkService().getClusterClient(conf);){
            JobID jobId = JobID.fromHexString((String)status.getJobStatus().getJobId());
            JobDetailsInfo jobDetailsInfo = (JobDetailsInfo)client.getJobDetails(jobId).get();
            if (jobDetailsInfo.getJobVertexInfos().isEmpty()) {
                boolean bl = false;
                return bl;
            }
            Map<JobVertexID, Integer> currentParallelisms = jobDetailsInfo.getJobVertexInfos().stream().collect(Collectors.toMap(JobDetailsInfo.JobVertexDetailsInfo::getJobVertexID, JobDetailsInfo.JobVertexDetailsInfo::getParallelism));
            Map parallelismOverrides = (Map)conf.get(PipelineOptions.PARALLELISM_OVERRIDES);
            for (Map.Entry<JobVertexID, Integer> entry : currentParallelisms.entrySet()) {
                Integer overrideParallelism;
                String override = (String)parallelismOverrides.get(entry.getKey().toHexString());
                if (override == null || (overrideParallelism = Integer.valueOf(override)).equals(entry.getValue())) continue;
                LOG.info("Scaling still in progress for vertex {}, {} -> {}", new Object[]{entry.getKey(), entry.getValue(), overrideParallelism});
                boolean bl = false;
                return bl;
            }
            LOG.info("All vertexes have successfully scaled");
            status.getReconciliationStatus().setState(ReconciliationState.DEPLOYED);
            boolean bl = true;
            return bl;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

