/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.service;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.ServiceResource;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.invoke.LambdaMetafactory;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType;
import org.apache.flink.kubernetes.operator.api.status.SavepointTriggerType;
import org.apache.flink.kubernetes.operator.artifact.ArtifactManager;
import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.exception.RecoveryFailureException;
import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.CheckpointHistoryWrapper;
import org.apache.flink.kubernetes.operator.service.CustomCheckpointingStatisticsHeaders;
import org.apache.flink.kubernetes.operator.service.CustomDashboardConfiguration;
import org.apache.flink.kubernetes.operator.service.CustomDashboardConfigurationHeaders;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.FileUpload;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.TriggerId;
import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders;
import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsMessageParameters;
import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHeaders;
import org.apache.flink.runtime.webmonitor.handlers.JarDeleteMessageParameters;
import org.apache.flink.runtime.webmonitor.handlers.JarRunHeaders;
import org.apache.flink.runtime.webmonitor.handlers.JarRunMessageParameters;
import org.apache.flink.runtime.webmonitor.handlers.JarRunRequestBody;
import org.apache.flink.runtime.webmonitor.handlers.JarRunResponseBody;
import org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders;
import org.apache.flink.runtime.webmonitor.handlers.JarUploadResponseBody;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractFlinkService
implements FlinkService {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractFlinkService.class);
    private static final String EMPTY_JAR_FILENAME = "empty.jar";
    protected final KubernetesClient kubernetesClient;
    protected final FlinkConfigManager configManager;
    private final ExecutorService executorService;
    protected final ArtifactManager artifactManager;
    private final String emptyJar;

    public AbstractFlinkService(KubernetesClient kubernetesClient, FlinkConfigManager configManager) {
        this.kubernetesClient = kubernetesClient;
        this.configManager = configManager;
        this.artifactManager = new ArtifactManager(configManager);
        this.executorService = Executors.newFixedThreadPool(4, (ThreadFactory)new ExecutorThreadFactory("Flink-RestClusterClient-IO"));
        this.emptyJar = this.createEmptyJar();
    }

    protected abstract PodList getJmPodList(String var1, String var2);

    protected abstract void deployApplicationCluster(JobSpec var1, Configuration var2) throws Exception;

    @Override
    public KubernetesClient getKubernetesClient() {
        return this.kubernetesClient;
    }

    @Override
    public void submitApplicationCluster(JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) throws Exception {
        LOG.info("Deploying application cluster{}", (Object)(requireHaMetadata ? " requiring last-state from HA metadata" : ""));
        if (FlinkUtils.isKubernetesHAActivated(conf)) {
            String clusterId = (String)conf.get(KubernetesConfigOptions.CLUSTER_ID);
            String namespace = (String)conf.get(KubernetesConfigOptions.NAMESPACE);
            FlinkUtils.deleteJobGraphInKubernetesHA(clusterId, namespace, this.kubernetesClient);
        }
        if (requireHaMetadata) {
            this.validateHaMetadataExists(conf);
        }
        this.deployApplicationCluster(jobSpec, AbstractFlinkService.removeOperatorConfigs(conf));
    }

    @Override
    public boolean isHaMetadataAvailable(Configuration conf) {
        return FlinkUtils.isHaMetadataAvailable(conf, this.kubernetesClient);
    }

    @Override
    public JobID submitJobToSessionCluster(ObjectMeta meta, FlinkSessionJobSpec spec, Configuration conf, @Nullable String savepoint) throws Exception {
        JobID jobID = FlinkUtils.generateSessionJobFixedJobID(meta);
        this.runJar(spec.getJob(), jobID, this.uploadJar(meta, spec, conf), conf, savepoint);
        LOG.info("Submitted job: {} to session cluster.", (Object)jobID);
        return jobID;
    }

    @Override
    public boolean isJobManagerPortReady(Configuration config) {
        URI uri;
        try (ClusterClient<String> clusterClient = this.getClusterClient(config);){
            uri = URI.create(clusterClient.getWebInterfaceURL());
        }
        catch (Exception ex) {
            throw new RuntimeException(ex);
        }
        InetSocketAddress socketAddress = new InetSocketAddress(uri.getHost(), uri.getPort());
        Socket socket = new Socket();
        try {
            socket.connect(socketAddress, 1000);
            socket.close();
            return true;
        }
        catch (IOException e) {
            return false;
        }
    }

    @Override
    public Collection<JobStatusMessage> listJobs(Configuration conf) throws Exception {
        try (RestClusterClient clusterClient = (RestClusterClient)this.getClusterClient(conf);){
            Collection collection = (Collection)((CompletableFuture)clusterClient.sendRequest((MessageHeaders)JobsOverviewHeaders.getInstance(), (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)EmptyRequestBody.getInstance()).thenApply(AbstractFlinkService::toJobStatusMessage)).get(this.configManager.getOperatorConfiguration().getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
            return collection;
        }
    }

    @Override
    public JobResult requestJobResult(Configuration conf, JobID jobID) throws Exception {
        try (ClusterClient<String> clusterClient = this.getClusterClient(conf);){
            JobResult jobResult = (JobResult)clusterClient.requestJobResult(jobID).get(this.configManager.getOperatorConfiguration().getFlinkClientTimeout().getSeconds(), TimeUnit.SECONDS);
            return jobResult;
        }
    }

    /*
     * Unable to fully structure code
     * Could not resolve type clashes
     */
    protected void cancelJob(FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration conf, boolean deleteClusterAfterSavepoint) throws Exception {
        deploymentStatus = (FlinkDeploymentStatus)deployment.getStatus();
        jobIdString = deploymentStatus.getJobStatus().getJobId();
        jobId = jobIdString != null ? JobID.fromHexString((String)jobIdString) : null;
        savepointOpt /* !! */  = Optional.empty();
        savepointFormatType = SavepointUtils.getSavepointFormatType(conf);
        clusterClient = this.getClusterClient(conf);
        try {
            clusterId = (String)clusterClient.getClusterId();
            switch (1.$SwitchMap$org$apache$flink$kubernetes$operator$api$spec$UpgradeMode[upgradeMode.ordinal()]) {
                case 1: {
                    if (ReconciliationUtils.isJobRunning((CommonStatus)deployment.getStatus())) {
                        AbstractFlinkService.LOG.info("Job is running, cancelling job.");
                        try {
                            clusterClient.cancel((JobID)Preconditions.checkNotNull((Object)jobId)).get(this.configManager.getOperatorConfiguration().getFlinkCancelJobTimeout().toSeconds(), TimeUnit.SECONDS);
                            AbstractFlinkService.LOG.info("Job successfully cancelled.");
                        }
                        catch (Exception e) {
                            AbstractFlinkService.LOG.error("Could not shut down cluster gracefully, deleting...", (Throwable)e);
                        }
                    }
                    this.deleteClusterDeployment(deployment.getMetadata(), deploymentStatus, true);
                    ** break;
lbl22:
                    // 1 sources

                    break;
                }
                case 2: {
                    savepointDirectory = (String)Preconditions.checkNotNull((Object)((String)conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY)));
                    timeout = ((Duration)conf.get(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT)).getSeconds();
                    if (ReconciliationUtils.isJobRunning(deploymentStatus)) {
                        try {
                            AbstractFlinkService.LOG.info("Suspending job with savepoint.");
                            savepoint = (String)clusterClient.stopWithSavepoint((JobID)Preconditions.checkNotNull((Object)jobId), false, savepointDirectory, (org.apache.flink.core.execution.SavepointFormatType)(((FlinkVersion)conf.get(FlinkConfigBuilder.FLINK_VERSION)).isNewerVersionThan(FlinkVersion.v1_14) != false ? savepointFormatType : null)).get(timeout, TimeUnit.SECONDS);
                            savepointOpt /* !! */  = Optional.of(savepoint);
                            AbstractFlinkService.LOG.info("Job successfully suspended with savepoint {}.", (Object)savepoint);
                        }
                        catch (TimeoutException exception) {
                            throw new FlinkException(String.format("Timed out stopping the job %s in Flink cluster %s with savepoint, please configure a larger timeout via '%s'", new Object[]{jobId, clusterId, ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.key()}), (Throwable)exception);
                        }
                    } else if (ReconciliationUtils.isJobInTerminalState(deploymentStatus)) {
                        AbstractFlinkService.LOG.info("Job is already in terminal state skipping cancel-with-savepoint operation.");
                    } else {
                        throw new RuntimeException("Unexpected non-terminal status: " + deploymentStatus);
                    }
                    if (deleteClusterAfterSavepoint) {
                        AbstractFlinkService.LOG.info("Cleaning up deployment after stop-with-savepoint");
                        this.deleteClusterDeployment(deployment.getMetadata(), deploymentStatus, true);
                        ** break;
                    }
lbl43:
                    // 3 sources

                    break;
                }
                case 3: {
                    this.deleteClusterDeployment(deployment.getMetadata(), deploymentStatus, false);
                    ** break;
lbl47:
                    // 1 sources

                    break;
                }
                default: {
                    throw new RuntimeException("Unsupported upgrade mode " + upgradeMode);
                }
            }
        }
        finally {
            if (clusterClient != null) {
                clusterClient.close();
            }
        }
        deploymentStatus.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
        savepointOpt /* !! */ .ifPresent((Consumer<String>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)V, lambda$cancelJob$0(org.apache.flink.core.execution.SavepointFormatType org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus java.lang.String ), (Ljava/lang/String;)V)((org.apache.flink.core.execution.SavepointFormatType)savepointFormatType, (FlinkDeploymentStatus)deploymentStatus));
        v0 = shutdownDisabled = upgradeMode != UpgradeMode.LAST_STATE && FlinkUtils.clusterShutdownDisabled((FlinkDeploymentSpec)ReconciliationUtils.getDeployedSpec(deployment)) != false;
        if (!shutdownDisabled) {
            this.waitForClusterShutdown(conf);
            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
        }
    }

    /*
     * Unable to fully structure code
     * Could not resolve type clashes
     */
    @Override
    public void cancelSessionJob(FlinkSessionJob sessionJob, UpgradeMode upgradeMode, Configuration conf) throws Exception {
        sessionJobStatus = (FlinkSessionJobStatus)sessionJob.getStatus();
        jobStatus = sessionJobStatus.getJobStatus();
        jobIdString = jobStatus.getJobId();
        Preconditions.checkNotNull((Object)jobIdString, (String)"The job to be suspend should not be null");
        jobId = JobID.fromHexString((String)jobIdString);
        savepointOpt /* !! */  = Optional.empty();
        AbstractFlinkService.LOG.debug("Current job state: {}", (Object)jobStatus.getState());
        if (!ReconciliationUtils.isJobInTerminalState(sessionJobStatus)) {
            AbstractFlinkService.LOG.debug("Job is not in terminal state, cancelling it");
            clusterClient = this.getClusterClient(conf);
            try {
                clusterId = (String)clusterClient.getClusterId();
                switch (1.$SwitchMap$org$apache$flink$kubernetes$operator$api$spec$UpgradeMode[upgradeMode.ordinal()]) {
                    case 1: {
                        AbstractFlinkService.LOG.info("Cancelling job.");
                        clusterClient.cancel(jobId).get(this.configManager.getOperatorConfiguration().getFlinkCancelJobTimeout().toSeconds(), TimeUnit.SECONDS);
                        AbstractFlinkService.LOG.info("Job successfully cancelled.");
                        ** break;
lbl21:
                        // 1 sources

                    }
                    case 2: {
                        if (ReconciliationUtils.isJobRunning(sessionJobStatus)) {
                            AbstractFlinkService.LOG.info("Suspending job with savepoint.");
                            savepointDirectory = (String)Preconditions.checkNotNull((Object)((String)conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY)));
                            timeout = ((Duration)conf.get(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT)).getSeconds();
                            try {
                                savepoint = (String)clusterClient.stopWithSavepoint(jobId, false, savepointDirectory, ((FlinkVersion)conf.get(FlinkConfigBuilder.FLINK_VERSION)).isNewerVersionThan(FlinkVersion.v1_14) != false ? (org.apache.flink.core.execution.SavepointFormatType)conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE) : null).get(timeout, TimeUnit.SECONDS);
                                savepointOpt /* !! */  = Optional.of(savepoint);
                                AbstractFlinkService.LOG.info("Job successfully suspended with savepoint {}.", (Object)savepoint);
                                ** break;
lbl32:
                                // 1 sources

                            }
                            catch (TimeoutException exception) {
                                throw new FlinkException(String.format("Timed out stopping the job %s in Flink cluster %s with savepoint, please configure a larger timeout via '%s'", new Object[]{jobId, clusterId, ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.key()}), (Throwable)exception);
                            }
                        }
                        throw new RuntimeException("Unexpected non-terminal status: " + jobStatus.getState());
                    }
                    default: {
                        throw new RuntimeException("Unsupported upgrade mode " + upgradeMode);
                    }
                }
            }
            finally {
                if (clusterClient != null) {
                    clusterClient.close();
                }
            }
        } else {
            AbstractFlinkService.LOG.debug("Job is in terminal state, skipping cancel");
        }
        jobStatus.setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
        savepointOpt /* !! */ .ifPresent((Consumer<String>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)V, lambda$cancelSessionJob$1(org.apache.flink.kubernetes.operator.api.status.JobStatus java.lang.String ), (Ljava/lang/String;)V)((JobStatus)jobStatus));
    }

    @Override
    public void triggerSavepoint(String jobId, SavepointTriggerType triggerType, org.apache.flink.kubernetes.operator.api.status.SavepointInfo savepointInfo, Configuration conf) throws Exception {
        LOG.info("Triggering new savepoint");
        try (RestClusterClient clusterClient = (RestClusterClient)this.getClusterClient(conf);){
            SavepointTriggerHeaders savepointTriggerHeaders = SavepointTriggerHeaders.getInstance();
            SavepointTriggerMessageParameters savepointTriggerMessageParameters = savepointTriggerHeaders.getUnresolvedMessageParameters();
            savepointTriggerMessageParameters.jobID.resolve((Object)JobID.fromHexString((String)jobId));
            String savepointDirectory = (String)Preconditions.checkNotNull((Object)((String)conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY)));
            long timeout = this.configManager.getOperatorConfiguration().getFlinkClientTimeout().getSeconds();
            org.apache.flink.core.execution.SavepointFormatType savepointFormatType = SavepointUtils.getSavepointFormatType(conf);
            TriggerResponse response = (TriggerResponse)clusterClient.sendRequest((MessageHeaders)savepointTriggerHeaders, (MessageParameters)savepointTriggerMessageParameters, (RequestBody)new SavepointTriggerRequestBody(savepointDirectory, false, (org.apache.flink.core.execution.SavepointFormatType)(((FlinkVersion)conf.get(FlinkConfigBuilder.FLINK_VERSION)).isNewerVersionThan(FlinkVersion.v1_14) ? savepointFormatType : null), null)).get(timeout, TimeUnit.SECONDS);
            LOG.info("Savepoint successfully triggered: " + response.getTriggerId().toHexString());
            savepointInfo.setTrigger(response.getTriggerId().toHexString(), triggerType, SavepointFormatType.valueOf((String)savepointFormatType.name()));
        }
    }

    @Override
    public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) throws Exception {
        try (RestClusterClient clusterClient = (RestClusterClient)this.getClusterClient(conf);){
            CustomCheckpointingStatisticsHeaders headers = CustomCheckpointingStatisticsHeaders.getInstance();
            JobMessageParameters params = headers.getUnresolvedMessageParameters();
            params.jobPathParameter.resolve((Object)jobId);
            CompletableFuture response = clusterClient.sendRequest((MessageHeaders)headers, (MessageParameters)params, (RequestBody)EmptyRequestBody.getInstance());
            CheckpointHistoryWrapper checkpoints = (CheckpointHistoryWrapper)response.get(this.configManager.getOperatorConfiguration().getFlinkClientTimeout().getSeconds(), TimeUnit.SECONDS);
            Optional<String> latestCheckpointOpt = checkpoints.getLatestCheckpointPath();
            if (latestCheckpointOpt.isPresent() && latestCheckpointOpt.get().equals("<checkpoint-not-externally-addressable>")) {
                throw new RecoveryFailureException("Latest checkpoint not externally addressable, manual recovery required.", "CheckpointNotFound");
            }
            Optional<Savepoint> optional = latestCheckpointOpt.map(pointer -> Savepoint.of((String)pointer, (SavepointTriggerType)SavepointTriggerType.UNKNOWN));
            return optional;
        }
    }

    @Override
    public void disposeSavepoint(String savepointPath, Configuration conf) throws Exception {
        try (RestClusterClient clusterClient = (RestClusterClient)this.getClusterClient(conf);){
            clusterClient.sendRequest((MessageHeaders)SavepointDisposalTriggerHeaders.getInstance(), (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)new SavepointDisposalRequest(savepointPath)).get(this.configManager.getOperatorConfiguration().getFlinkClientTimeout().getSeconds(), TimeUnit.SECONDS);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public SavepointFetchResult fetchSavepointInfo(String triggerId, String jobId, Configuration conf) {
        LOG.info("Fetching savepoint result with triggerId: " + triggerId);
        try (RestClusterClient clusterClient = (RestClusterClient)this.getClusterClient(conf);){
            SavepointStatusHeaders savepointStatusHeaders = SavepointStatusHeaders.getInstance();
            SavepointStatusMessageParameters savepointStatusMessageParameters = savepointStatusHeaders.getUnresolvedMessageParameters();
            savepointStatusMessageParameters.jobIdPathParameter.resolve((Object)JobID.fromHexString((String)jobId));
            savepointStatusMessageParameters.triggerIdPathParameter.resolve((Object)TriggerId.fromHexString((String)triggerId));
            CompletableFuture response = clusterClient.sendRequest((MessageHeaders)savepointStatusHeaders, (MessageParameters)savepointStatusMessageParameters, (RequestBody)EmptyRequestBody.getInstance());
            if (response.get() == null || ((AsynchronousOperationResult)response.get()).resource() == null) {
                SavepointFetchResult savepointFetchResult = SavepointFetchResult.pending();
                return savepointFetchResult;
            }
            if (((SavepointInfo)((AsynchronousOperationResult)response.get()).resource()).getLocation() == null) {
                if (((SavepointInfo)((AsynchronousOperationResult)response.get()).resource()).getFailureCause() != null) {
                    LOG.error("Failure occurred while fetching the savepoint result", (Throwable)((SavepointInfo)((AsynchronousOperationResult)response.get()).resource()).getFailureCause());
                    SavepointFetchResult savepointFetchResult = SavepointFetchResult.error(((SavepointInfo)((AsynchronousOperationResult)response.get()).resource()).getFailureCause().toString());
                    return savepointFetchResult;
                }
                SavepointFetchResult savepointFetchResult = SavepointFetchResult.pending();
                return savepointFetchResult;
            }
            String location = ((SavepointInfo)((AsynchronousOperationResult)response.get()).resource()).getLocation();
            LOG.info("Savepoint result: {}", (Object)location);
            SavepointFetchResult savepointFetchResult = SavepointFetchResult.completed(location);
            return savepointFetchResult;
        }
        catch (Exception e) {
            LOG.error("Exception while fetching the savepoint result", (Throwable)e);
            return SavepointFetchResult.error(e.getMessage());
        }
    }

    @Override
    public Map<String, String> getClusterInfo(Configuration conf) throws Exception {
        HashMap<String, String> runtimeVersion = new HashMap<String, String>();
        try (RestClusterClient clusterClient = (RestClusterClient)this.getClusterClient(conf);){
            CustomDashboardConfiguration dashboardConfiguration = (CustomDashboardConfiguration)clusterClient.sendRequest((MessageHeaders)CustomDashboardConfigurationHeaders.getInstance(), (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)EmptyRequestBody.getInstance()).get(this.configManager.getOperatorConfiguration().getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
            runtimeVersion.put("flink-version", dashboardConfiguration.getFlinkVersion());
            runtimeVersion.put("flink-revision", dashboardConfiguration.getFlinkRevision());
        }
        return runtimeVersion;
    }

    @Override
    public PodList getJmPodList(FlinkDeployment deployment, Configuration conf) {
        String clusterId;
        String namespace = conf.getString(KubernetesConfigOptions.NAMESPACE);
        try (ClusterClient<String> clusterClient = this.getClusterClient(conf);){
            clusterId = (String)clusterClient.getClusterId();
        }
        catch (Exception ex) {
            throw new RuntimeException(ex);
        }
        return this.getJmPodList(namespace, clusterId);
    }

    @Override
    public void waitForClusterShutdown(Configuration conf) {
        this.waitForClusterShutdown(conf.getString(KubernetesConfigOptions.NAMESPACE), conf.getString(KubernetesConfigOptions.CLUSTER_ID), this.configManager.getOperatorConfiguration().getFlinkShutdownClusterTimeout().toSeconds());
    }

    @VisibleForTesting
    protected ClusterClient<String> getClusterClient(Configuration conf) throws Exception {
        String clusterId = (String)conf.get(KubernetesConfigOptions.CLUSTER_ID);
        String namespace = (String)conf.get(KubernetesConfigOptions.NAMESPACE);
        int port = conf.getInteger(RestOptions.PORT);
        String host = (String)ObjectUtils.firstNonNull((Object[])new String[]{this.configManager.getOperatorConfiguration().getFlinkServiceHostOverride(), ExternalServiceDecorator.getNamespacedExternalServiceName((String)clusterId, (String)namespace)});
        String restServerAddress = String.format("http://%s:%s", host, port);
        LOG.debug("Creating RestClusterClient({})", (Object)restServerAddress);
        return new RestClusterClient(conf, (Object)clusterId, (c, e) -> new StandaloneClientHAServices(restServerAddress));
    }

    private JarRunResponseBody runJar(JobSpec job, JobID jobID, JarUploadResponseBody response, Configuration conf, String savepoint) {
        String jarId = response.getFilename().substring(response.getFilename().lastIndexOf("/") + 1);
        try {
            JarRunResponseBody jarRunResponseBody;
            block11: {
                RestClusterClient clusterClient = (RestClusterClient)this.getClusterClient(conf);
                try {
                    JarRunHeaders headers = JarRunHeaders.getInstance();
                    JarRunMessageParameters parameters = headers.getUnresolvedMessageParameters();
                    parameters.jarIdPathParameter.resolve((Object)jarId);
                    JarRunRequestBody runRequestBody = new JarRunRequestBody(job.getEntryClass(), null, job.getArgs() == null ? null : Arrays.asList(job.getArgs()), job.getParallelism() > 0 ? Integer.valueOf(job.getParallelism()) : null, jobID, job.getAllowNonRestoredState(), savepoint, (RestoreMode)(((FlinkVersion)conf.get(FlinkConfigBuilder.FLINK_VERSION)).isNewerVersionThan(FlinkVersion.v1_14) ? RestoreMode.DEFAULT : null));
                    LOG.info("Submitting job: {} to session cluster.", (Object)jobID.toHexString());
                    jarRunResponseBody = (JarRunResponseBody)clusterClient.sendRequest((MessageHeaders)headers, (MessageParameters)parameters, (RequestBody)runRequestBody).get(this.configManager.getOperatorConfiguration().getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
                    if (clusterClient == null) break block11;
                }
                catch (Throwable throwable) {
                    try {
                        if (clusterClient != null) {
                            try {
                                clusterClient.close();
                            }
                            catch (Throwable throwable2) {
                                throwable.addSuppressed(throwable2);
                            }
                        }
                        throw throwable;
                    }
                    catch (Exception e) {
                        LOG.error("Failed to submit job to session cluster.", (Throwable)e);
                        throw new FlinkRuntimeException((Throwable)e);
                    }
                }
                clusterClient.close();
            }
            return jarRunResponseBody;
        }
        finally {
            this.deleteJar(conf, jarId);
        }
    }

    private JarUploadResponseBody uploadJar(ObjectMeta objectMeta, FlinkSessionJobSpec spec, Configuration conf) throws Exception {
        String targetDir = this.artifactManager.generateJarDir(objectMeta, spec);
        File jarFile = this.artifactManager.fetch(this.findJarURI(spec.getJob()), conf, targetDir);
        Preconditions.checkArgument((boolean)jarFile.exists(), (Object)String.format("The jar file %s not exists", jarFile.getAbsolutePath()));
        JarUploadHeaders headers = JarUploadHeaders.getInstance();
        String clusterId = spec.getDeploymentName();
        String namespace = objectMeta.getNamespace();
        int port = conf.getInteger(RestOptions.PORT);
        String host = (String)ObjectUtils.firstNonNull((Object[])new String[]{this.configManager.getOperatorConfiguration().getFlinkServiceHostOverride(), ExternalServiceDecorator.getNamespacedExternalServiceName((String)clusterId, (String)namespace)});
        try {
            JarUploadResponseBody jarUploadResponseBody;
            try (RestClient restClient = new RestClient(conf, (Executor)this.executorService);){
                jarUploadResponseBody = (JarUploadResponseBody)restClient.sendRequest(host, port, (MessageHeaders)headers, (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)EmptyRequestBody.getInstance(), Collections.singletonList(new FileUpload(jarFile.toPath(), "application/java-archive"))).get(this.configManager.getOperatorConfiguration().getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
            }
            return jarUploadResponseBody;
        }
        finally {
            LOG.debug("Deleting the jar file {}", (Object)jarFile);
            FileUtils.deleteFileOrDirectory((File)jarFile);
        }
    }

    private String findJarURI(JobSpec jobSpec) {
        if (jobSpec.getJarURI() != null) {
            return jobSpec.getJarURI();
        }
        return this.emptyJar;
    }

    private void deleteJar(Configuration conf, String jarId) {
        LOG.debug("Deleting the jar: {}", (Object)jarId);
        try (RestClusterClient clusterClient = (RestClusterClient)this.getClusterClient(conf);){
            JarDeleteHeaders headers = JarDeleteHeaders.getInstance();
            JarDeleteMessageParameters parameters = headers.getUnresolvedMessageParameters();
            parameters.jarIdPathParameter.resolve((Object)jarId);
            clusterClient.sendRequest((MessageHeaders)headers, (MessageParameters)parameters, (RequestBody)EmptyRequestBody.getInstance()).get(this.configManager.getOperatorConfiguration().getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
        }
        catch (Exception e) {
            LOG.error("Failed to delete the jar: {}.", (Object)jarId, (Object)e);
        }
    }

    @VisibleForTesting
    void waitForClusterShutdown(String namespace, String clusterId, long shutdownTimeout) {
        boolean jobManagerRunning = true;
        boolean serviceRunning = true;
        int i = 0;
        while ((long)i < shutdownTimeout) {
            Service service;
            PodList jmPodList;
            if (jobManagerRunning && ((jmPodList = this.getJmPodList(namespace, clusterId)) == null || jmPodList.getItems().isEmpty())) {
                jobManagerRunning = false;
            }
            if (serviceRunning && (service = (Service)((ServiceResource)((NonNamespaceOperation)this.kubernetesClient.services().inNamespace(namespace)).withName(ExternalServiceDecorator.getExternalServiceName((String)clusterId))).fromServer().get()) == null) {
                serviceRunning = false;
            }
            if (!jobManagerRunning && !serviceRunning) break;
            if ((i + 1) % 5 == 0) {
                LOG.info("Waiting for cluster shutdown... ({}s)", (Object)(i + 1));
            }
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            ++i;
        }
        LOG.info("Cluster shutdown completed.");
    }

    private static List<JobStatusMessage> toJobStatusMessage(MultipleJobsDetails multipleJobsDetails) {
        return multipleJobsDetails.getJobs().stream().map(details -> new JobStatusMessage(details.getJobId(), details.getJobName(), AbstractFlinkService.getEffectiveStatus(details), details.getStartTime())).collect(Collectors.toList());
    }

    @VisibleForTesting
    protected static Configuration removeOperatorConfigs(Configuration config) {
        Configuration newConfig = new Configuration();
        config.toMap().forEach((k, v) -> {
            if (!k.startsWith("kubernetes.operator.")) {
                newConfig.setString(k, v);
            }
        });
        return newConfig;
    }

    @VisibleForTesting
    protected static org.apache.flink.api.common.JobStatus getEffectiveStatus(JobDetails details) {
        int numRunning = details.getTasksPerState()[ExecutionState.RUNNING.ordinal()];
        int numFinished = details.getTasksPerState()[ExecutionState.FINISHED.ordinal()];
        boolean allRunningOrFinished = details.getNumTasks() == numRunning + numFinished;
        org.apache.flink.api.common.JobStatus effectiveStatus = details.getStatus();
        if (org.apache.flink.api.common.JobStatus.RUNNING.equals((Object)effectiveStatus) && !allRunningOrFinished) {
            effectiveStatus = org.apache.flink.api.common.JobStatus.CREATED;
            LOG.debug("Adjusting job state from {} to {}", (Object)org.apache.flink.api.common.JobStatus.RUNNING, (Object)effectiveStatus);
        }
        return effectiveStatus;
    }

    private void validateHaMetadataExists(Configuration conf) {
        if (!this.isHaMetadataAvailable(conf)) {
            throw new RecoveryFailureException("HA metadata not available to restore from last state. It is possible that the job has finished or terminally failed, or the configmaps have been deleted. Manual restore required.", "RestoreFailed");
        }
    }

    private String createEmptyJar() {
        try {
            String emptyJarPath = Files.createTempDirectory("flink", new FileAttribute[0]).toString() + "/empty.jar";
            LOG.debug("Creating empty jar to {}", (Object)emptyJarPath);
            JarOutputStream target = new JarOutputStream((OutputStream)new FileOutputStream(emptyJarPath), new Manifest());
            target.close();
            return emptyJarPath;
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to create empty jar", e);
        }
    }

    @Override
    public Map<String, String> getMetrics(Configuration conf, String jobId, List<String> metricNames) throws Exception {
        try (RestClusterClient clusterClient = (RestClusterClient)this.getClusterClient(conf);){
            JobMetricsMessageParameters jobMetricsMessageParameters = JobMetricsHeaders.getInstance().getUnresolvedMessageParameters();
            jobMetricsMessageParameters.jobPathParameter.resolve((Object)JobID.fromHexString((String)jobId));
            jobMetricsMessageParameters.metricsFilterParameter.resolve(metricNames);
            MetricCollectionResponseBody responseBody = (MetricCollectionResponseBody)clusterClient.sendRequest((MessageHeaders)JobMetricsHeaders.getInstance(), (MessageParameters)jobMetricsMessageParameters, (RequestBody)EmptyRequestBody.getInstance()).get();
            Map<String, String> map = responseBody.getMetrics().stream().map(metric -> Tuple2.of((Object)metric.getId(), (Object)metric.getValue())).collect(Collectors.toMap(t -> (String)t.f0, t -> (String)t.f1));
            return map;
        }
    }

    @Override
    public final void deleteClusterDeployment(ObjectMeta meta, FlinkDeploymentStatus status, boolean deleteHaData) {
        this.deleteClusterInternal(meta, deleteHaData);
        this.updateStatusAfterClusterDeletion(status);
    }

    protected abstract void deleteClusterInternal(ObjectMeta var1, boolean var2);

    protected void updateStatusAfterClusterDeletion(FlinkDeploymentStatus status) {
        status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
        String currentJobState = status.getJobStatus().getState();
        if (currentJobState == null || !org.apache.flink.api.common.JobStatus.valueOf((String)currentJobState).isGloballyTerminalState()) {
            status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
        }
    }

    private static /* synthetic */ void lambda$cancelSessionJob$1(JobStatus jobStatus, String location) {
        Savepoint sp = Savepoint.of((String)location, (SavepointTriggerType)SavepointTriggerType.UPGRADE);
        jobStatus.getSavepointInfo().updateLastSavepoint(sp);
    }

    private static /* synthetic */ void lambda$cancelJob$0(org.apache.flink.core.execution.SavepointFormatType savepointFormatType, FlinkDeploymentStatus deploymentStatus, String location) {
        Savepoint sp = Savepoint.of((String)location, (SavepointTriggerType)SavepointTriggerType.UPGRADE, (SavepointFormatType)SavepointFormatType.valueOf((String)savepointFormatType.name()));
        deploymentStatus.getJobStatus().getSavepointInfo().updateLastSavepoint(sp);
    }
}

