/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.kubeclient;

import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
import org.apache.flink.kubernetes.kubeclient.Endpoint;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.KubernetesConfigMapSharedWatcher;
import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification;
import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPodsWatcher;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
import org.apache.flink.kubernetes.kubeclient.services.ServiceType;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ConfigMap;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.HasMetadata;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.IntOrString;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.KubernetesResource;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ListOptionsBuilder;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.OwnerReference;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.OwnerReferenceBuilder;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Pod;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.PodList;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Service;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ServiceBuilder;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ServiceFluent;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ServiceSpecFluent;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.apps.Deployment;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.dsl.PodResource;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.dsl.Resource;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.dsl.RollableScalableResource;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.dsl.ServiceResource;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Fabric8FlinkKubeClient
implements FlinkKubeClient {
    private static final Logger LOG = LoggerFactory.getLogger(Fabric8FlinkKubeClient.class);
    private final String clusterId;
    private final String namespace;
    private final int maxRetryAttempts;
    private final KubernetesConfigOptions.NodePortAddressType nodePortAddressType;
    private final NamespacedKubernetesClient internalClient;
    private final ExecutorService kubeClientExecutorService;
    private final AtomicReference<Deployment> masterDeploymentRef;

    public Fabric8FlinkKubeClient(Configuration flinkConfig, NamespacedKubernetesClient client, ExecutorService executorService) {
        this.clusterId = (String)flinkConfig.getOptional(KubernetesConfigOptions.CLUSTER_ID).orElseThrow(() -> new IllegalArgumentException(String.format("Configuration option '%s' is not set.", KubernetesConfigOptions.CLUSTER_ID.key())));
        this.namespace = flinkConfig.getString(KubernetesConfigOptions.NAMESPACE);
        this.maxRetryAttempts = flinkConfig.getInteger(KubernetesConfigOptions.KUBERNETES_TRANSACTIONAL_OPERATION_MAX_RETRIES);
        this.nodePortAddressType = (KubernetesConfigOptions.NodePortAddressType)((Object)flinkConfig.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_NODE_PORT_ADDRESS_TYPE));
        this.internalClient = (NamespacedKubernetesClient)Preconditions.checkNotNull((Object)client);
        this.kubeClientExecutorService = (ExecutorService)Preconditions.checkNotNull((Object)executorService);
        this.masterDeploymentRef = new AtomicReference();
    }

    @Override
    public void createJobManagerComponent(KubernetesJobManagerSpecification kubernetesJMSpec) {
        Deployment deployment = kubernetesJMSpec.getDeployment();
        List<HasMetadata> accompanyingResources = kubernetesJMSpec.getAccompanyingResources();
        LOG.debug("Start to create deployment with spec {}{}", (Object)System.lineSeparator(), (Object)KubernetesUtils.tryToGetPrettyPrintYaml(deployment));
        Deployment createdDeployment = (Deployment)this.internalClient.resource(deployment).create();
        this.setOwnerReference(createdDeployment, accompanyingResources);
        this.internalClient.resourceList(accompanyingResources).createOrReplace();
    }

    @Override
    public CompletableFuture<Void> createTaskManagerPod(KubernetesPod kubernetesPod) {
        return CompletableFuture.runAsync(() -> {
            if (this.masterDeploymentRef.get() == null) {
                Deployment masterDeployment = (Deployment)((RollableScalableResource)this.internalClient.apps().deployments().withName(KubernetesUtils.getDeploymentName(this.clusterId))).get();
                if (masterDeployment == null) {
                    throw new RuntimeException("Failed to find Deployment named " + this.clusterId + " in namespace " + this.namespace);
                }
                this.masterDeploymentRef.compareAndSet(null, masterDeployment);
            }
            this.setOwnerReference((Deployment)Preconditions.checkNotNull((Object)this.masterDeploymentRef.get()), Collections.singletonList(kubernetesPod.getInternalResource()));
            LOG.debug("Start to create pod with spec {}{}", (Object)System.lineSeparator(), (Object)KubernetesUtils.tryToGetPrettyPrintYaml((KubernetesResource)kubernetesPod.getInternalResource()));
            this.internalClient.resource((HasMetadata)kubernetesPod.getInternalResource()).create();
        }, this.kubeClientExecutorService);
    }

    @Override
    public CompletableFuture<Void> stopPod(String podName) {
        return CompletableFuture.runAsync(() -> ((PodResource)this.internalClient.pods().withName(podName)).delete(), this.kubeClientExecutorService);
    }

    @Override
    public Optional<Endpoint> getRestEndpoint(String clusterId) {
        Optional<KubernetesService> restService = this.getService(ExternalServiceDecorator.getExternalServiceName(clusterId));
        if (!restService.isPresent()) {
            return Optional.empty();
        }
        Service service = (Service)restService.get().getInternalResource();
        KubernetesConfigOptions.ServiceExposedType serviceExposedType = ServiceType.classify(service);
        return serviceExposedType.serviceType().getRestEndpoint(service, this.internalClient, this.nodePortAddressType);
    }

    @Override
    public List<KubernetesPod> getPodsWithLabels(Map<String, String> labels) {
        List<Pod> podList = ((PodList)((FilterWatchListDeletable)this.internalClient.pods().withLabels(labels)).list(((ListOptionsBuilder)new ListOptionsBuilder().withResourceVersion("0")).build())).getItems();
        if (podList == null || podList.isEmpty()) {
            return new ArrayList<KubernetesPod>();
        }
        return podList.stream().map(KubernetesPod::new).collect(Collectors.toList());
    }

    @Override
    public void stopAndCleanupCluster(String clusterId) {
        ((RollableScalableResource)this.internalClient.apps().deployments().withName(KubernetesUtils.getDeploymentName(clusterId))).cascading(true).delete();
    }

    @Override
    public Optional<KubernetesService> getService(String serviceName) {
        Service service = (Service)((ServiceResource)this.internalClient.services().withName(serviceName)).get();
        if (service == null) {
            LOG.debug("Service {} does not exist", (Object)serviceName);
            return Optional.empty();
        }
        return Optional.of(new KubernetesService(service));
    }

    @Override
    public KubernetesWatch watchPodsAndDoCallback(Map<String, String> labels, FlinkKubeClient.WatchCallbackHandler<KubernetesPod> podCallbackHandler) throws Exception {
        return (KubernetesWatch)FutureUtils.retry(() -> CompletableFuture.supplyAsync(() -> new KubernetesWatch(((FilterWatchListDeletable)this.internalClient.pods().withLabels(labels)).withResourceVersion("0").watch(new KubernetesPodsWatcher(podCallbackHandler))), this.kubeClientExecutorService), (int)this.maxRetryAttempts, t -> ExceptionUtils.findThrowable((Throwable)t, KubernetesClientException.class).isPresent(), (Executor)this.kubeClientExecutorService).get();
    }

    @Override
    public KubernetesLeaderElector createLeaderElector(KubernetesLeaderElectionConfiguration leaderElectionConfiguration, KubernetesLeaderElector.LeaderCallbackHandler leaderCallbackHandler) {
        return new KubernetesLeaderElector(this.internalClient, leaderElectionConfiguration, leaderCallbackHandler);
    }

    @Override
    public CompletableFuture<Void> createConfigMap(KubernetesConfigMap configMap) {
        String configMapName = configMap.getName();
        return CompletableFuture.runAsync(() -> {
            ConfigMap cfr_ignored_0 = (ConfigMap)this.internalClient.resource((HasMetadata)configMap.getInternalResource()).create();
        }, this.kubeClientExecutorService).exceptionally(throwable -> {
            throw new CompletionException((Throwable)((Object)new KubernetesException("Failed to create ConfigMap " + configMapName, (Throwable)throwable)));
        });
    }

    @Override
    public Optional<KubernetesConfigMap> getConfigMap(String name) {
        ConfigMap configMap = (ConfigMap)((Resource)this.internalClient.configMaps().withName(name)).get();
        return configMap == null ? Optional.empty() : Optional.of(new KubernetesConfigMap(configMap));
    }

    @Override
    public CompletableFuture<Boolean> checkAndUpdateConfigMap(String configMapName, Function<KubernetesConfigMap, Optional<KubernetesConfigMap>> updateFunction) {
        return FutureUtils.retry(() -> this.attemptCheckAndUpdateConfigMap(configMapName, updateFunction), (int)this.maxRetryAttempts, t -> ExceptionUtils.findThrowable((Throwable)t, KubernetesClientException.class).isPresent(), (Executor)this.kubeClientExecutorService);
    }

    private CompletableFuture<Boolean> attemptCheckAndUpdateConfigMap(String configMapName, Function<KubernetesConfigMap, Optional<KubernetesConfigMap>> updateFunction) {
        return CompletableFuture.supplyAsync(() -> {
            KubernetesConfigMap configMap = this.getConfigMap(configMapName).orElseThrow(() -> new CompletionException((Throwable)((Object)new KubernetesException("Cannot retry checkAndUpdateConfigMap with configMap " + configMapName + " because it does not exist."))));
            Optional maybeUpdate = (Optional)updateFunction.apply(configMap);
            if (maybeUpdate.isPresent()) {
                try {
                    this.internalClient.resource((HasMetadata)((KubernetesConfigMap)maybeUpdate.get()).getInternalResource()).lockResourceVersion().update();
                    return true;
                }
                catch (Throwable throwable) {
                    LOG.debug("Failed to update ConfigMap {} with data {}. Trying again.", (Object)configMap.getName(), configMap.getData());
                    throw new CompletionException((Throwable)new PossibleInconsistentStateException(throwable));
                }
            }
            return false;
        }, this.kubeClientExecutorService);
    }

    @Override
    public CompletableFuture<Void> deleteConfigMapsByLabels(Map<String, String> labels) {
        return CompletableFuture.runAsync(() -> ((FilterWatchListDeletable)this.internalClient.configMaps().withLabels(labels)).delete(), this.kubeClientExecutorService);
    }

    @Override
    public CompletableFuture<Void> deleteConfigMap(String configMapName) {
        return CompletableFuture.runAsync(() -> ((Resource)this.internalClient.configMaps().withName(configMapName)).delete(), this.kubeClientExecutorService);
    }

    @Override
    public KubernetesConfigMapSharedWatcher createConfigMapSharedWatcher(Map<String, String> labels) {
        return new KubernetesConfigMapSharedInformer(this.internalClient, labels);
    }

    @Override
    public void close() {
        this.internalClient.close();
        ExecutorUtils.gracefulShutdown((long)5L, (TimeUnit)TimeUnit.SECONDS, (ExecutorService[])new ExecutorService[]{this.kubeClientExecutorService});
    }

    @Override
    public KubernetesPod loadPodFromTemplateFile(File file) {
        if (!file.exists()) {
            throw new FlinkRuntimeException(String.format("Pod template file %s does not exist.", file));
        }
        return new KubernetesPod((Pod)((PodResource)this.internalClient.pods().load(file)).item());
    }

    @Override
    public CompletableFuture<Void> updateServiceTargetPort(String serviceName, String portName, int targetPort) {
        LOG.debug("Update {} target port to {}", (Object)portName, (Object)targetPort);
        return CompletableFuture.runAsync(() -> this.getService(serviceName).ifPresent(service -> {
            Service updatedService = ((ServiceBuilder)((ServiceFluent.SpecNested)((ServiceSpecFluent.PortsNested)new ServiceBuilder((Service)service.getInternalResource()).editSpec().editMatchingPort(servicePortBuilder -> servicePortBuilder.build().getName().equals(portName)).withTargetPort(new IntOrString(targetPort))).endPort()).endSpec()).build();
            this.internalClient.resource(updatedService).update();
        }), this.kubeClientExecutorService);
    }

    private void setOwnerReference(Deployment deployment, List<HasMetadata> resources) {
        OwnerReference deploymentOwnerReference = ((OwnerReferenceBuilder)((OwnerReferenceBuilder)((OwnerReferenceBuilder)((OwnerReferenceBuilder)((OwnerReferenceBuilder)((OwnerReferenceBuilder)new OwnerReferenceBuilder().withName(deployment.getMetadata().getName())).withApiVersion(deployment.getApiVersion())).withUid(deployment.getMetadata().getUid())).withKind(deployment.getKind())).withController(true)).withBlockOwnerDeletion(true)).build();
        resources.forEach(resource -> resource.getMetadata().setOwnerReferences(Collections.singletonList(deploymentOwnerReference)));
    }
}

