/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.service;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.RollableScalableResource;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.kubernetes.KubernetesClusterClientFactory;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.Mode;
import org.apache.flink.kubernetes.operator.kubeclient.Fabric8FlinkStandaloneKubeClient;
import org.apache.flink.kubernetes.operator.kubeclient.FlinkStandaloneKubeClient;
import org.apache.flink.kubernetes.operator.service.AbstractFlinkService;
import org.apache.flink.kubernetes.operator.standalone.KubernetesStandaloneClusterDescriptor;
import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StandaloneFlinkService
extends AbstractFlinkService {
    private static final Logger LOG = LoggerFactory.getLogger(StandaloneFlinkService.class);

    public StandaloneFlinkService(KubernetesClient kubernetesClient, FlinkConfigManager configManager) {
        super(kubernetesClient, configManager);
    }

    @Override
    protected void deployApplicationCluster(JobSpec jobSpec, Configuration conf) throws Exception {
        LOG.info("Deploying application cluster");
        this.submitClusterInternal(StandaloneFlinkService.removeOperatorConfigs(conf), Mode.APPLICATION);
        LOG.info("Application cluster successfully deployed");
    }

    @Override
    public void submitSessionCluster(Configuration conf) throws Exception {
        LOG.info("Deploying session cluster");
        this.submitClusterInternal(StandaloneFlinkService.removeOperatorConfigs(conf), Mode.SESSION);
        LOG.info("Session cluster successfully deployed");
    }

    @Override
    public void cancelJob(FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration conf) throws Exception {
        this.cancelJob(deployment, upgradeMode, conf, true);
    }

    @Override
    protected PodList getJmPodList(String namespace, String clusterId) {
        return (PodList)((FilterWatchListDeletable)((NonNamespaceOperation)this.kubernetesClient.pods().inNamespace(namespace)).withLabels(StandaloneKubernetesUtils.getJobManagerSelectors((String)clusterId))).list();
    }

    @VisibleForTesting
    protected FlinkStandaloneKubeClient createNamespacedKubeClient(Configuration configuration) {
        int poolSize = (Integer)configuration.get(KubernetesConfigOptions.KUBERNETES_CLIENT_IO_EXECUTOR_POOL_SIZE);
        ExecutorService executorService = Executors.newFixedThreadPool(poolSize, (ThreadFactory)new ExecutorThreadFactory("flink-kubeclient-io-for-standalone-service"));
        return Fabric8FlinkStandaloneKubeClient.create((Configuration)configuration, (ExecutorService)executorService);
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    protected void submitClusterInternal(Configuration conf, Mode mode) throws ClusterDeploymentException {
        FlinkStandaloneKubeClient client = this.createNamespacedKubeClient(conf);
        try (KubernetesStandaloneClusterDescriptor kubernetesClusterDescriptor = new KubernetesStandaloneClusterDescriptor(conf, client);){
            switch (mode) {
                case APPLICATION: {
                    kubernetesClusterDescriptor.deployApplicationCluster(this.getClusterSpecification(conf), ApplicationConfiguration.fromConfiguration((Configuration)conf));
                    return;
                }
                case SESSION: {
                    kubernetesClusterDescriptor.deploySessionCluster(this.getClusterSpecification(conf));
                    return;
                }
                default: {
                    throw new UnsupportedOperationException(String.format("Unsupported running mode: %s", new Object[]{mode}));
                }
            }
        }
    }

    private ClusterSpecification getClusterSpecification(Configuration conf) {
        return new KubernetesClusterClientFactory().getClusterSpecification(conf);
    }

    @Override
    protected void deleteClusterInternal(ObjectMeta meta, boolean deleteHaConfigmaps) {
        String clusterId = meta.getName();
        String namespace = meta.getNamespace();
        LOG.info("Deleting Flink Standalone cluster JM resources");
        ((RollableScalableResource)((NonNamespaceOperation)this.kubernetesClient.apps().deployments().inNamespace(namespace)).withName(StandaloneKubernetesUtils.getJobManagerDeploymentName((String)clusterId))).delete();
        LOG.info("Deleting Flink Standalone cluster TM resources");
        ((RollableScalableResource)((NonNamespaceOperation)this.kubernetesClient.apps().deployments().inNamespace(namespace)).withName(StandaloneKubernetesUtils.getTaskManagerDeploymentName((String)clusterId))).delete();
        if (deleteHaConfigmaps) {
            this.waitForClusterShutdown(namespace, clusterId, this.configManager.getOperatorConfiguration().getFlinkShutdownClusterTimeout().toSeconds());
            ((FilterWatchListDeletable)((NonNamespaceOperation)this.kubernetesClient.configMaps().inNamespace(namespace)).withLabels(KubernetesUtils.getConfigMapLabels((String)clusterId, (String)"high-availability"))).delete();
        }
    }

    @Override
    public boolean scale(ObjectMeta meta, JobSpec jobSpec, Configuration conf) {
        Integer desiredReplicas;
        if (conf.get(JobManagerOptions.SCHEDULER_MODE) == null) {
            LOG.info("Reactive scaling is not enabled");
            return false;
        }
        String clusterId = meta.getName();
        String namespace = meta.getNamespace();
        String name = StandaloneKubernetesUtils.getTaskManagerDeploymentName((String)clusterId);
        RollableScalableResource deployment = (RollableScalableResource)((NonNamespaceOperation)this.kubernetesClient.apps().deployments().inNamespace(namespace)).withName(name);
        if (deployment == null || deployment.get() == null) {
            LOG.warn("TM Deployment ({}) not found", (Object)name);
            return false;
        }
        Integer actualReplicas = ((Deployment)deployment.get()).getSpec().getReplicas();
        if (actualReplicas != (desiredReplicas = (Integer)conf.get(StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS))) {
            LOG.info("Scaling TM replicas: actual({}) -> desired({})", (Object)actualReplicas, (Object)desiredReplicas);
            deployment.scale(desiredReplicas.intValue());
        } else {
            LOG.info("Not scaling TM replicas: actual({}) == desired({})", (Object)actualReplicas, (Object)desiredReplicas);
        }
        return true;
    }
}

