/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.service;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.RollableScalableResource;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterClientServiceLoader;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.service.AbstractFlinkService;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NativeFlinkService
extends AbstractFlinkService {
    private static final Logger LOG = LoggerFactory.getLogger(NativeFlinkService.class);

    public NativeFlinkService(KubernetesClient kubernetesClient, FlinkConfigManager configManager) {
        super(kubernetesClient, configManager);
    }

    @Override
    protected void deployApplicationCluster(JobSpec jobSpec, Configuration conf) throws Exception {
        LOG.info("Deploying application cluster");
        DefaultClusterClientServiceLoader clusterClientServiceLoader = new DefaultClusterClientServiceLoader();
        ApplicationClusterDeployer deployer = new ApplicationClusterDeployer((ClusterClientServiceLoader)clusterClientServiceLoader);
        ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(jobSpec.getArgs() != null ? jobSpec.getArgs() : new String[]{}, jobSpec.getEntryClass());
        deployer.run(conf, applicationConfiguration);
        LOG.info("Application cluster successfully deployed");
    }

    @Override
    public void submitSessionCluster(Configuration conf) throws Exception {
        this.submitClusterInternal(NativeFlinkService.removeOperatorConfigs(conf));
    }

    @Override
    public void cancelJob(FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration configuration) throws Exception {
        boolean deleteClusterAfterSavepoint = !((FlinkDeploymentSpec)deployment.getSpec()).getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_14);
        this.cancelJob(deployment, upgradeMode, configuration, deleteClusterAfterSavepoint);
    }

    @Override
    protected PodList getJmPodList(String namespace, String clusterId) {
        return (PodList)((FilterWatchListDeletable)((NonNamespaceOperation)this.kubernetesClient.pods().inNamespace(namespace)).withLabels(KubernetesUtils.getJobManagerSelectors((String)clusterId))).list();
    }

    protected void submitClusterInternal(Configuration conf) throws Exception {
        LOG.info("Deploying session cluster");
        DefaultClusterClientServiceLoader clusterClientServiceLoader = new DefaultClusterClientServiceLoader();
        ClusterClientFactory kubernetesClusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(conf);
        try (ClusterDescriptor kubernetesClusterDescriptor = kubernetesClusterClientFactory.createClusterDescriptor(conf);){
            kubernetesClusterDescriptor.deploySessionCluster(kubernetesClusterClientFactory.getClusterSpecification(conf));
        }
        LOG.info("Session cluster successfully deployed");
    }

    @Override
    protected void deleteClusterInternal(ObjectMeta meta, boolean deleteHaConfigmaps) {
        String namespace = meta.getNamespace();
        String clusterId = meta.getName();
        LOG.info("Deleting JobManager deployment {}.", (Object)(deleteHaConfigmaps ? "and HA metadata" : "while preserving HA metadata"));
        ((RollableScalableResource)((NonNamespaceOperation)this.kubernetesClient.apps().deployments().inNamespace(namespace)).withName(KubernetesUtils.getDeploymentName((String)clusterId))).delete();
        if (deleteHaConfigmaps) {
            this.waitForClusterShutdown(namespace, clusterId, this.configManager.getOperatorConfiguration().getFlinkShutdownClusterTimeout().toSeconds());
            ((FilterWatchListDeletable)((NonNamespaceOperation)this.kubernetesClient.configMaps().inNamespace(namespace)).withLabels(KubernetesUtils.getConfigMapLabels((String)clusterId, (String)"high-availability"))).delete();
        }
    }
}

