/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes;

import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.KubernetesWorkerNode;
import org.apache.flink.kubernetes.configuration.KubernetesResourceManagerDriverConfiguration;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.InternalServiceDecorator;
import org.apache.flink.kubernetes.kubeclient.factory.KubernetesTaskManagerFactory;
import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesTooOldResourceVersionException;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.externalresource.ExternalResourceUtils;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.util.ResourceManagerUtils;
import org.apache.flink.runtime.util.config.memory.ProcessMemorySpec;
import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;

public class KubernetesResourceManagerDriver
extends AbstractResourceManagerDriver<KubernetesWorkerNode> {
    private static final String TASK_MANAGER_POD_FORMAT = "%s-taskmanager-%d-%d";
    private final String clusterId;
    private final String webInterfaceUrl;
    private final FlinkKubeClient flinkKubeClient;
    private final Map<String, CompletableFuture<KubernetesWorkerNode>> requestResourceFutures;
    private long currentMaxAttemptId = 0L;
    private long currentMaxPodId = 0L;
    private Optional<KubernetesWatch> podsWatchOpt;
    private volatile boolean running;
    private FlinkPod taskManagerPodTemplate;

    public KubernetesResourceManagerDriver(Configuration flinkConfig, FlinkKubeClient flinkKubeClient, KubernetesResourceManagerDriverConfiguration configuration) {
        super(flinkConfig, GlobalConfiguration.loadConfiguration());
        this.clusterId = (String)Preconditions.checkNotNull((Object)configuration.getClusterId());
        this.webInterfaceUrl = configuration.getWebInterfaceUrl();
        this.flinkKubeClient = (FlinkKubeClient)Preconditions.checkNotNull((Object)flinkKubeClient);
        this.requestResourceFutures = new HashMap<String, CompletableFuture<KubernetesWorkerNode>>();
        this.running = false;
    }

    protected void initializeInternal() throws Exception {
        this.podsWatchOpt = this.watchTaskManagerPods();
        File podTemplateFile = KubernetesUtils.getTaskManagerPodTemplateFileInPod();
        this.taskManagerPodTemplate = podTemplateFile.exists() ? KubernetesUtils.loadPodFromTemplateFile(this.flinkKubeClient, podTemplateFile, "flink-main-container") : new FlinkPod.Builder().build();
        this.updateKubernetesServiceTargetPortIfNecessary();
        this.recoverWorkerNodesFromPreviousAttempts();
        this.running = true;
    }

    public void terminate() throws Exception {
        if (!this.running) {
            return;
        }
        this.running = false;
        Exception exception = null;
        try {
            this.podsWatchOpt.ifPresent(KubernetesWatch::close);
        }
        catch (Exception e) {
            exception = e;
        }
        try {
            this.flinkKubeClient.close();
        }
        catch (Exception e) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
        }
        if (exception != null) {
            throw exception;
        }
    }

    public void deregisterApplication(ApplicationStatus finalStatus, @Nullable String optionalDiagnostics) {
        this.log.info("Deregistering Flink Kubernetes cluster, clusterId: {}, diagnostics: {}", (Object)this.clusterId, (Object)(optionalDiagnostics == null ? "" : optionalDiagnostics));
        this.flinkKubeClient.stopAndCleanupCluster(this.clusterId);
    }

    public CompletableFuture<KubernetesWorkerNode> requestResource(TaskExecutorProcessSpec taskExecutorProcessSpec) {
        KubernetesTaskManagerParameters parameters = this.createKubernetesTaskManagerParameters(taskExecutorProcessSpec, this.getBlockedNodeRetriever().getAllBlockedNodeIds());
        KubernetesPod taskManagerPod = KubernetesTaskManagerFactory.buildTaskManagerKubernetesPod(this.taskManagerPodTemplate, parameters);
        String podName = taskManagerPod.getName();
        CompletableFuture<KubernetesWorkerNode> requestResourceFuture = new CompletableFuture<KubernetesWorkerNode>();
        this.requestResourceFutures.put(podName, requestResourceFuture);
        this.log.info("Creating new TaskManager pod with name {} and resource <{},{}>.", new Object[]{podName, parameters.getTaskManagerMemoryMB(), parameters.getTaskManagerCPU()});
        CompletableFuture<Void> createPodFuture = this.flinkKubeClient.createTaskManagerPod(taskManagerPod);
        FutureUtils.assertNoException((CompletableFuture)createPodFuture.handleAsync((ignore, exception) -> {
            if (exception != null) {
                this.log.warn("Could not create pod {}, exception: {}", (Object)podName, exception);
                CompletableFuture<KubernetesWorkerNode> future = this.requestResourceFutures.remove(taskManagerPod.getName());
                if (future != null) {
                    future.completeExceptionally((Throwable)exception);
                }
            } else {
                this.log.info("Pod {} is created.", (Object)podName);
            }
            return null;
        }, (Executor)this.getMainThreadExecutor()));
        return requestResourceFuture;
    }

    public void releaseResource(KubernetesWorkerNode worker) {
        String podName = worker.getResourceID().toString();
        this.log.info("Stopping TaskManager pod {}.", (Object)podName);
        this.stopPod(podName);
    }

    private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerException {
        List<KubernetesPod> podList = this.flinkKubeClient.getPodsWithLabels(KubernetesUtils.getTaskManagerSelectors(this.clusterId));
        ArrayList<KubernetesWorkerNode> recoveredWorkers = new ArrayList<KubernetesWorkerNode>();
        for (KubernetesPod pod : podList) {
            KubernetesWorkerNode worker = new KubernetesWorkerNode(new ResourceID(pod.getName()));
            long attempt = worker.getAttempt();
            if (attempt > this.currentMaxAttemptId) {
                this.currentMaxAttemptId = attempt;
            }
            if (pod.isTerminated() || !pod.isScheduled()) {
                this.stopPod(pod.getName());
                continue;
            }
            recoveredWorkers.add(worker);
        }
        this.log.info("Recovered {} pods from previous attempts, current attempt id is {}.", (Object)recoveredWorkers.size(), (Object)(++this.currentMaxAttemptId));
        this.getResourceEventHandler().onPreviousAttemptWorkersRecovered(recoveredWorkers);
    }

    private void updateKubernetesServiceTargetPortIfNecessary() throws Exception {
        if (!KubernetesUtils.isHostNetwork(this.flinkConfig)) {
            return;
        }
        int restPort = ResourceManagerUtils.parseRestBindPortFromWebInterfaceUrl((String)this.webInterfaceUrl);
        Preconditions.checkArgument((restPort > 0 ? 1 : 0) != 0, (Object)("Failed to parse rest port from " + this.webInterfaceUrl));
        String restServiceName = ExternalServiceDecorator.getExternalServiceName(this.clusterId);
        this.flinkKubeClient.updateServiceTargetPort(restServiceName, "rest", restPort).get();
        if (!HighAvailabilityMode.isHighAvailabilityModeActivated((Configuration)this.flinkConfig)) {
            String internalServiceName = InternalServiceDecorator.getInternalServiceName(this.clusterId);
            this.flinkKubeClient.updateServiceTargetPort(internalServiceName, "blobserver", Integer.parseInt(this.flinkConfig.getString(BlobServerOptions.PORT))).get();
            this.flinkKubeClient.updateServiceTargetPort(internalServiceName, "jobmanager-rpc", this.flinkConfig.getInteger(JobManagerOptions.PORT)).get();
        }
    }

    private KubernetesTaskManagerParameters createKubernetesTaskManagerParameters(TaskExecutorProcessSpec taskExecutorProcessSpec, Set<String> blockedNodes) {
        String podName = String.format(TASK_MANAGER_POD_FORMAT, this.clusterId, this.currentMaxAttemptId, ++this.currentMaxPodId);
        ContaineredTaskManagerParameters taskManagerParameters = ContaineredTaskManagerParameters.create((Configuration)this.flinkConfig, (TaskExecutorProcessSpec)taskExecutorProcessSpec);
        Configuration taskManagerConfig = new Configuration(this.flinkConfig);
        taskManagerConfig.set(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID, (Object)podName);
        String dynamicProperties = BootstrapTools.getDynamicPropertiesAsString((Configuration)this.flinkClientConfig, (Configuration)taskManagerConfig);
        String jvmMemOpts = ProcessMemoryUtils.generateJvmParametersStr((ProcessMemorySpec)taskExecutorProcessSpec);
        return new KubernetesTaskManagerParameters(this.flinkConfig, podName, dynamicProperties, jvmMemOpts, taskManagerParameters, ExternalResourceUtils.getExternalResourceConfigurationKeys((Configuration)this.flinkConfig, (String)"kubernetes.config-key"), blockedNodes);
    }

    private void handlePodEventsInMainThread(List<KubernetesPod> pods, PodEvent podEvent) {
        this.getMainThreadExecutor().execute(() -> {
            for (KubernetesPod pod : pods) {
                if (podEvent == PodEvent.DELETED || pod.isTerminated()) {
                    this.onPodTerminated(pod);
                    continue;
                }
                if (!pod.isScheduled()) continue;
                this.onPodScheduled(pod);
            }
        });
    }

    private void onPodScheduled(KubernetesPod pod) {
        String podName = pod.getName();
        CompletableFuture<KubernetesWorkerNode> requestResourceFuture = this.requestResourceFutures.remove(podName);
        if (requestResourceFuture == null) {
            this.log.debug("Ignore TaskManager pod that is already added: {}", (Object)podName);
            return;
        }
        this.log.info("Received new TaskManager pod: {}", (Object)podName);
        requestResourceFuture.complete(new KubernetesWorkerNode(new ResourceID(podName)));
    }

    private void onPodTerminated(KubernetesPod pod) {
        String podName = pod.getName();
        this.log.debug("TaskManager pod {} is terminated.", (Object)podName);
        CompletableFuture<KubernetesWorkerNode> requestResourceFuture = this.requestResourceFutures.remove(podName);
        if (requestResourceFuture != null) {
            this.log.warn("Pod {} is terminated before being scheduled.", (Object)podName);
            requestResourceFuture.completeExceptionally((Throwable)new FlinkException("Pod is terminated."));
        }
        this.getResourceEventHandler().onWorkerTerminated(new ResourceID(podName), pod.getTerminatedDiagnostics());
        this.stopPod(podName);
    }

    private void stopPod(String podName) {
        this.flinkKubeClient.stopPod(podName).whenComplete((ignore, throwable) -> {
            if (throwable != null) {
                this.log.warn("Could not remove TaskManager pod {}, exception: {}", (Object)podName, throwable);
            }
        });
    }

    private Optional<KubernetesWatch> watchTaskManagerPods() throws Exception {
        return Optional.of(this.flinkKubeClient.watchPodsAndDoCallback(KubernetesUtils.getTaskManagerSelectors(this.clusterId), new PodCallbackHandlerImpl()));
    }

    private static enum PodEvent {
        ADDED,
        MODIFIED,
        DELETED,
        ERROR;

    }

    private class PodCallbackHandlerImpl
    implements FlinkKubeClient.WatchCallbackHandler<KubernetesPod> {
        private PodCallbackHandlerImpl() {
        }

        @Override
        public void onAdded(List<KubernetesPod> pods) {
            KubernetesResourceManagerDriver.this.handlePodEventsInMainThread(pods, PodEvent.ADDED);
        }

        @Override
        public void onModified(List<KubernetesPod> pods) {
            KubernetesResourceManagerDriver.this.handlePodEventsInMainThread(pods, PodEvent.MODIFIED);
        }

        @Override
        public void onDeleted(List<KubernetesPod> pods) {
            KubernetesResourceManagerDriver.this.handlePodEventsInMainThread(pods, PodEvent.DELETED);
        }

        @Override
        public void onError(List<KubernetesPod> pods) {
            KubernetesResourceManagerDriver.this.handlePodEventsInMainThread(pods, PodEvent.ERROR);
        }

        @Override
        public void handleError(Throwable throwable) {
            if (throwable instanceof KubernetesTooOldResourceVersionException) {
                KubernetesResourceManagerDriver.this.getMainThreadExecutor().execute(() -> {
                    if (KubernetesResourceManagerDriver.this.running) {
                        KubernetesResourceManagerDriver.this.podsWatchOpt.ifPresent(KubernetesWatch::close);
                        KubernetesResourceManagerDriver.this.log.info("Creating a new watch on TaskManager pods.");
                        try {
                            KubernetesResourceManagerDriver.this.podsWatchOpt = KubernetesResourceManagerDriver.this.watchTaskManagerPods();
                        }
                        catch (Exception e) {
                            KubernetesResourceManagerDriver.this.getResourceEventHandler().onError((Throwable)e);
                        }
                    }
                });
            } else {
                KubernetesResourceManagerDriver.this.getResourceEventHandler().onError(throwable);
            }
        }
    }
}

