/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.service;

import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.client.GracePeriodConfigurable;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.dsl.Waitable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.invoke.LambdaMetafactory;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.autoscaler.utils.JobStatusUtils;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.CheckpointType;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType;
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
import org.apache.flink.kubernetes.operator.artifact.ArtifactManager;
import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.exception.RecoveryFailureException;
import org.apache.flink.kubernetes.operator.observer.CheckpointFetchResult;
import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.CheckpointHistoryWrapper;
import org.apache.flink.kubernetes.operator.service.CustomCheckpointingStatisticsHeaders;
import org.apache.flink.kubernetes.operator.service.CustomDashboardConfiguration;
import org.apache.flink.kubernetes.operator.service.CustomDashboardConfigurationHeaders;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EnvUtils;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.rest.FileUpload;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.TriggerId;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointInfo;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatusHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatusMessageParameters;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerMessageParameters;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerRequestBody;
import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders;
import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsMessageParameters;
import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException;
import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHeaders;
import org.apache.flink.runtime.webmonitor.handlers.JarDeleteMessageParameters;
import org.apache.flink.runtime.webmonitor.handlers.JarRunHeaders;
import org.apache.flink.runtime.webmonitor.handlers.JarRunMessageParameters;
import org.apache.flink.runtime.webmonitor.handlers.JarRunRequestBody;
import org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders;
import org.apache.flink.runtime.webmonitor.handlers.JarUploadResponseBody;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractFlinkService
implements FlinkService {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractFlinkService.class);
    private static final String EMPTY_JAR_FILENAME = "empty.jar";
    public static final String FIELD_NAME_TOTAL_CPU = "total-cpu";
    public static final String FIELD_NAME_TOTAL_MEMORY = "total-memory";
    protected final KubernetesClient kubernetesClient;
    protected final ExecutorService executorService;
    protected final FlinkOperatorConfiguration operatorConfig;
    protected final ArtifactManager artifactManager;
    private static final String EMPTY_JAR = AbstractFlinkService.createEmptyJar();

    public AbstractFlinkService(KubernetesClient kubernetesClient, ArtifactManager artifactManager, ExecutorService executorService, FlinkOperatorConfiguration operatorConfig) {
        this.kubernetesClient = kubernetesClient;
        this.artifactManager = artifactManager;
        this.executorService = executorService;
        this.operatorConfig = operatorConfig;
    }

    protected abstract PodList getJmPodList(String var1, String var2);

    protected abstract PodList getTmPodList(String var1, String var2);

    protected abstract void deployApplicationCluster(JobSpec var1, Configuration var2) throws Exception;

    protected abstract void deploySessionCluster(Configuration var1) throws Exception;

    @Override
    public KubernetesClient getKubernetesClient() {
        return this.kubernetesClient;
    }

    @Override
    public void submitApplicationCluster(JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) throws Exception {
        LOG.info("Deploying application cluster{}", (Object)(requireHaMetadata ? " requiring last-state from HA metadata" : ""));
        if (FlinkUtils.isKubernetesHAActivated(conf)) {
            String clusterId = (String)conf.get(KubernetesConfigOptions.CLUSTER_ID);
            String namespace = (String)conf.get(KubernetesConfigOptions.NAMESPACE);
            FlinkUtils.deleteJobGraphInKubernetesHA(clusterId, namespace, this.kubernetesClient);
        } else if (FlinkUtils.isZookeeperHAActivated(conf)) {
            FlinkUtils.deleteJobGraphInZookeeperHA(conf);
        }
        if (requireHaMetadata) {
            this.validateHaMetadataExists(conf);
        }
        this.deployApplicationCluster(jobSpec, AbstractFlinkService.removeOperatorConfigs(conf));
    }

    @Override
    public void submitSessionCluster(Configuration conf) throws Exception {
        this.deploySessionCluster(conf);
    }

    @Override
    public boolean isHaMetadataAvailable(Configuration conf) {
        if (FlinkUtils.isKubernetesHAActivated(conf)) {
            return FlinkUtils.isKubernetesHaMetadataAvailable(conf, this.kubernetesClient);
        }
        if (FlinkUtils.isZookeeperHAActivated(conf)) {
            return FlinkUtils.isZookeeperHaMetadataAvailable(conf);
        }
        return false;
    }

    @Override
    public JobID submitJobToSessionCluster(ObjectMeta meta, FlinkSessionJobSpec spec, JobID jobID, Configuration conf, @Nullable String savepoint) throws Exception {
        this.runJar(spec.getJob(), jobID, this.uploadJar(meta, spec, conf), conf, savepoint);
        LOG.info("Submitted job: {} to session cluster.", (Object)jobID);
        return jobID;
    }

    @Override
    public boolean isJobManagerPortReady(Configuration config) {
        boolean bl;
        SocketAddress socketAddress;
        try (RestClusterClient<String> clusterClient = this.getClusterClient(config);){
            socketAddress = this.getSocketAddress(clusterClient);
        }
        catch (Exception ex) {
            throw new FlinkRuntimeException((Throwable)ex);
        }
        Socket socket = new Socket();
        try {
            socket.connect(socketAddress, 1000);
            bl = true;
        }
        catch (Throwable throwable) {
            try {
                try {
                    socket.close();
                }
                catch (Throwable throwable2) {
                    throwable.addSuppressed(throwable2);
                }
                throw throwable;
            }
            catch (IOException e) {
                return false;
            }
        }
        socket.close();
        return bl;
    }

    protected SocketAddress getSocketAddress(RestClusterClient<String> clusterClient) throws MalformedURLException {
        URL url = new URL(clusterClient.getWebInterfaceURL());
        LOG.debug("JobManager webinterface url {}", (Object)clusterClient.getWebInterfaceURL());
        return new InetSocketAddress(url.getHost(), url.getPort());
    }

    @Override
    public Collection<JobStatusMessage> listJobs(Configuration conf) throws Exception {
        try (RestClusterClient<String> clusterClient = this.getClusterClient(conf);){
            Collection collection = (Collection)((CompletableFuture)clusterClient.sendRequest((MessageHeaders)JobsOverviewHeaders.getInstance(), (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)EmptyRequestBody.getInstance()).thenApply(JobStatusUtils::toJobStatusMessage)).get(this.operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
            return collection;
        }
    }

    @Override
    public JobResult requestJobResult(Configuration conf, JobID jobID) throws Exception {
        try (RestClusterClient<String> clusterClient = this.getClusterClient(conf);){
            JobResult jobResult = (JobResult)clusterClient.requestJobResult(jobID).get(this.operatorConfig.getFlinkClientTimeout().getSeconds(), TimeUnit.SECONDS);
            return jobResult;
        }
    }

    /*
     * Unable to fully structure code
     * Could not resolve type clashes
     */
    protected void cancelJob(FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration conf, boolean deleteClusterAfterSavepoint) throws Exception {
        deploymentStatus = (FlinkDeploymentStatus)deployment.getStatus();
        jobIdString = deploymentStatus.getJobStatus().getJobId();
        jobId = jobIdString != null ? JobID.fromHexString((String)jobIdString) : null;
        savepointOpt /* !! */  = Optional.empty();
        savepointFormatType = (org.apache.flink.core.execution.SavepointFormatType)conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE);
        clusterClient = this.getClusterClient(conf);
        try {
            clusterId = (String)clusterClient.getClusterId();
            switch (1.$SwitchMap$org$apache$flink$kubernetes$operator$api$spec$UpgradeMode[upgradeMode.ordinal()]) {
                case 1: {
                    if (ReconciliationUtils.isJobRunning((CommonStatus)deployment.getStatus())) {
                        AbstractFlinkService.LOG.info("Job is running, cancelling job.");
                        try {
                            clusterClient.cancel((JobID)Preconditions.checkNotNull((Object)jobId)).get(this.operatorConfig.getFlinkCancelJobTimeout().toSeconds(), TimeUnit.SECONDS);
                            AbstractFlinkService.LOG.info("Job successfully cancelled.");
                        }
                        catch (Exception e) {
                            AbstractFlinkService.LOG.error("Could not shut down cluster gracefully, deleting...", (Throwable)e);
                        }
                    }
                    this.deleteClusterDeployment(deployment.getMetadata(), deploymentStatus, conf, true);
                    ** break;
lbl22:
                    // 1 sources

                    break;
                }
                case 2: {
                    savepointDirectory = (String)Preconditions.checkNotNull((Object)((String)conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY)));
                    timeout = ((Duration)conf.get(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT)).getSeconds();
                    if (ReconciliationUtils.isJobRunning(deploymentStatus)) {
                        try {
                            AbstractFlinkService.LOG.info("Suspending job with savepoint.");
                            savepoint = (String)clusterClient.stopWithSavepoint((JobID)Preconditions.checkNotNull((Object)jobId), conf.getBoolean(KubernetesOperatorConfigOptions.DRAIN_ON_SAVEPOINT_DELETION), savepointDirectory, savepointFormatType).get(timeout, TimeUnit.SECONDS);
                            savepointOpt /* !! */  = Optional.of(savepoint);
                            AbstractFlinkService.LOG.info("Job successfully suspended with savepoint {}.", (Object)savepoint);
                        }
                        catch (TimeoutException exception) {
                            throw new FlinkException(String.format("Timed out stopping the job %s in Flink cluster %s with savepoint, please configure a larger timeout via '%s'", new Object[]{jobId, clusterId, ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.key()}), (Throwable)exception);
                        }
                        catch (Exception e) {
                            stopWithSavepointException = ExceptionUtils.findThrowableSerializedAware((Throwable)e, StopWithSavepointStoppingException.class, (ClassLoader)this.getClass().getClassLoader());
                            if (stopWithSavepointException.isPresent()) {
                                savepointOpt /* !! */  = Optional.of(((StopWithSavepointStoppingException)stopWithSavepointException.get()).getSavepointPath());
                            }
                            throw e;
                        }
                    } else if (ReconciliationUtils.isJobInTerminalState(deploymentStatus)) {
                        AbstractFlinkService.LOG.info("Job is already in terminal state skipping cancel-with-savepoint operation.");
                    } else {
                        throw new RuntimeException("Unexpected non-terminal status: " + deploymentStatus);
                    }
                    if (deleteClusterAfterSavepoint) {
                        AbstractFlinkService.LOG.info("Cleaning up deployment after stop-with-savepoint");
                        this.deleteClusterDeployment(deployment.getMetadata(), deploymentStatus, conf, true);
                        ** break;
                    }
lbl49:
                    // 3 sources

                    break;
                }
                case 3: {
                    this.deleteClusterDeployment(deployment.getMetadata(), deploymentStatus, conf, false);
                    ** break;
lbl53:
                    // 1 sources

                    break;
                }
                default: {
                    throw new RuntimeException("Unsupported upgrade mode " + upgradeMode);
                }
            }
        }
        finally {
            if (clusterClient != null) {
                clusterClient.close();
            }
        }
        deploymentStatus.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
        savepointOpt /* !! */ .ifPresent((Consumer<String>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)V, lambda$cancelJob$0(org.apache.flink.core.execution.SavepointFormatType org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus java.lang.String ), (Ljava/lang/String;)V)((org.apache.flink.core.execution.SavepointFormatType)savepointFormatType, (FlinkDeploymentStatus)deploymentStatus));
        if (deleteClusterAfterSavepoint || upgradeMode != UpgradeMode.SAVEPOINT) {
            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
        }
    }

    /*
     * Unable to fully structure code
     * Could not resolve type clashes
     */
    @Override
    public void cancelSessionJob(FlinkSessionJob sessionJob, UpgradeMode upgradeMode, Configuration conf) throws Exception {
        sessionJobStatus = (FlinkSessionJobStatus)sessionJob.getStatus();
        jobStatus = sessionJobStatus.getJobStatus();
        jobIdString = jobStatus.getJobId();
        Preconditions.checkNotNull((Object)jobIdString, (String)"The job to be suspend should not be null");
        jobId = JobID.fromHexString((String)jobIdString);
        savepointOpt /* !! */  = Optional.empty();
        AbstractFlinkService.LOG.debug("Current job state: {}", (Object)jobStatus.getState());
        if (!ReconciliationUtils.isJobInTerminalState(sessionJobStatus)) {
            AbstractFlinkService.LOG.debug("Job is not in terminal state, cancelling it");
            clusterClient = this.getClusterClient(conf);
            try {
                clusterId = (String)clusterClient.getClusterId();
                switch (1.$SwitchMap$org$apache$flink$kubernetes$operator$api$spec$UpgradeMode[upgradeMode.ordinal()]) {
                    case 1: {
                        AbstractFlinkService.LOG.info("Cancelling job.");
                        clusterClient.cancel(jobId).get(this.operatorConfig.getFlinkCancelJobTimeout().toSeconds(), TimeUnit.SECONDS);
                        AbstractFlinkService.LOG.info("Job successfully cancelled.");
                        ** break;
lbl21:
                        // 1 sources

                    }
                    case 2: {
                        if (ReconciliationUtils.isJobRunning(sessionJobStatus)) {
                            AbstractFlinkService.LOG.info("Suspending job with savepoint.");
                            savepointDirectory = (String)Preconditions.checkNotNull((Object)((String)conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY)));
                            timeout = ((Duration)conf.get(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT)).getSeconds();
                            try {
                                savepoint = (String)clusterClient.stopWithSavepoint(jobId, conf.getBoolean(KubernetesOperatorConfigOptions.DRAIN_ON_SAVEPOINT_DELETION), savepointDirectory, (org.apache.flink.core.execution.SavepointFormatType)conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE)).get(timeout, TimeUnit.SECONDS);
                                savepointOpt /* !! */  = Optional.of(savepoint);
                                AbstractFlinkService.LOG.info("Job successfully suspended with savepoint {}.", (Object)savepoint);
                                ** break;
lbl32:
                                // 1 sources

                            }
                            catch (TimeoutException exception) {
                                throw new FlinkException(String.format("Timed out stopping the job %s in Flink cluster %s with savepoint, please configure a larger timeout via '%s'", new Object[]{jobId, clusterId, ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.key()}), (Throwable)exception);
                            }
                        }
                        throw new RuntimeException("Unexpected non-terminal status: " + jobStatus.getState());
                    }
                    default: {
                        throw new RuntimeException("Unsupported upgrade mode " + upgradeMode);
                    }
                }
            }
            finally {
                if (clusterClient != null) {
                    clusterClient.close();
                }
            }
        } else {
            AbstractFlinkService.LOG.debug("Job is in terminal state, skipping cancel");
        }
        jobStatus.setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
        savepointOpt /* !! */ .ifPresent((Consumer<String>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)V, lambda$cancelSessionJob$1(org.apache.flink.kubernetes.operator.api.status.JobStatus java.lang.String ), (Ljava/lang/String;)V)((JobStatus)jobStatus));
    }

    @Override
    public void triggerSavepoint(String jobId, SnapshotTriggerType triggerType, org.apache.flink.kubernetes.operator.api.status.SavepointInfo savepointInfo, Configuration conf) throws Exception {
        LOG.info("Triggering new savepoint");
        try (RestClusterClient<String> clusterClient = this.getClusterClient(conf);){
            SavepointTriggerHeaders savepointTriggerHeaders = SavepointTriggerHeaders.getInstance();
            SavepointTriggerMessageParameters savepointTriggerMessageParameters = savepointTriggerHeaders.getUnresolvedMessageParameters();
            savepointTriggerMessageParameters.jobID.resolve((Object)JobID.fromHexString((String)jobId));
            String savepointDirectory = (String)Preconditions.checkNotNull((Object)((String)conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY)));
            long timeout = this.operatorConfig.getFlinkClientTimeout().getSeconds();
            org.apache.flink.core.execution.SavepointFormatType savepointFormatType = (org.apache.flink.core.execution.SavepointFormatType)conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE);
            TriggerResponse response = (TriggerResponse)clusterClient.sendRequest((MessageHeaders)savepointTriggerHeaders, (MessageParameters)savepointTriggerMessageParameters, (RequestBody)new SavepointTriggerRequestBody(savepointDirectory, false, savepointFormatType, null)).get(timeout, TimeUnit.SECONDS);
            LOG.info("Savepoint successfully triggered: " + response.getTriggerId().toHexString());
            savepointInfo.setTrigger(response.getTriggerId().toHexString(), triggerType, SavepointFormatType.valueOf((String)savepointFormatType.name()));
        }
    }

    @Override
    public void triggerCheckpoint(String jobId, SnapshotTriggerType triggerType, org.apache.flink.kubernetes.operator.api.status.CheckpointInfo checkpointInfo, Configuration conf) throws Exception {
        LOG.info("Triggering new checkpoint");
        try (RestClusterClient<String> clusterClient = this.getClusterClient(conf);){
            CheckpointTriggerHeaders checkpointTriggerHeaders = CheckpointTriggerHeaders.getInstance();
            CheckpointTriggerMessageParameters checkpointTriggerMessageParameters = checkpointTriggerHeaders.getUnresolvedMessageParameters();
            checkpointTriggerMessageParameters.jobID.resolve((Object)JobID.fromHexString((String)jobId));
            long timeout = this.operatorConfig.getFlinkClientTimeout().getSeconds();
            org.apache.flink.core.execution.CheckpointType checkpointFormatType = org.apache.flink.core.execution.CheckpointType.FULL;
            TriggerResponse response = (TriggerResponse)clusterClient.sendRequest((MessageHeaders)checkpointTriggerHeaders, (MessageParameters)checkpointTriggerMessageParameters, (RequestBody)new CheckpointTriggerRequestBody(checkpointFormatType, null)).get(timeout, TimeUnit.SECONDS);
            LOG.info("Checkpoint successfully triggered: " + response.getTriggerId().toHexString());
            checkpointInfo.setTrigger(response.getTriggerId().toHexString(), triggerType, CheckpointType.valueOf((String)checkpointFormatType.name()));
        }
    }

    @Override
    public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) throws Exception {
        Optional latestCheckpointOpt = (Optional)this.getCheckpointInfo((JobID)jobId, (Configuration)conf).f0;
        if (latestCheckpointOpt.isPresent() && ((CheckpointHistoryWrapper.CompletedCheckpointInfo)latestCheckpointOpt.get()).getExternalPointer().equals("<checkpoint-not-externally-addressable>")) {
            throw new RecoveryFailureException("Latest checkpoint not externally addressable, manual recovery required.", "CheckpointNotFound");
        }
        return latestCheckpointOpt.map(pointer -> Savepoint.of((String)pointer.getExternalPointer(), (long)pointer.getTimestamp(), (SnapshotTriggerType)SnapshotTriggerType.UNKNOWN));
    }

    @Override
    public Tuple2<Optional<CheckpointHistoryWrapper.CompletedCheckpointInfo>, Optional<CheckpointHistoryWrapper.PendingCheckpointInfo>> getCheckpointInfo(JobID jobId, Configuration conf) throws Exception {
        try (RestClusterClient<String> clusterClient = this.getClusterClient(conf);){
            CustomCheckpointingStatisticsHeaders headers = CustomCheckpointingStatisticsHeaders.getInstance();
            JobMessageParameters params = headers.getUnresolvedMessageParameters();
            params.jobPathParameter.resolve((Object)jobId);
            CompletableFuture response = clusterClient.sendRequest((MessageHeaders)headers, (MessageParameters)params, (RequestBody)EmptyRequestBody.getInstance());
            CheckpointHistoryWrapper checkpoints = (CheckpointHistoryWrapper)response.get(this.operatorConfig.getFlinkClientTimeout().getSeconds(), TimeUnit.SECONDS);
            Tuple2 tuple2 = Tuple2.of(checkpoints.getLatestCompletedCheckpoint(), checkpoints.getInProgressCheckpoint());
            return tuple2;
        }
    }

    @Override
    public void disposeSavepoint(String savepointPath, Configuration conf) throws Exception {
        try (RestClusterClient<String> clusterClient = this.getClusterClient(conf);){
            clusterClient.sendRequest((MessageHeaders)SavepointDisposalTriggerHeaders.getInstance(), (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)new SavepointDisposalRequest(savepointPath)).get(this.operatorConfig.getFlinkClientTimeout().getSeconds(), TimeUnit.SECONDS);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public SavepointFetchResult fetchSavepointInfo(String triggerId, String jobId, Configuration conf) {
        LOG.info("Fetching savepoint result with triggerId: " + triggerId);
        try (RestClusterClient<String> clusterClient = this.getClusterClient(conf);){
            SavepointStatusHeaders savepointStatusHeaders = SavepointStatusHeaders.getInstance();
            SavepointStatusMessageParameters savepointStatusMessageParameters = savepointStatusHeaders.getUnresolvedMessageParameters();
            savepointStatusMessageParameters.jobIdPathParameter.resolve((Object)JobID.fromHexString((String)jobId));
            savepointStatusMessageParameters.triggerIdPathParameter.resolve((Object)TriggerId.fromHexString((String)triggerId));
            CompletableFuture response = clusterClient.sendRequest((MessageHeaders)savepointStatusHeaders, (MessageParameters)savepointStatusMessageParameters, (RequestBody)EmptyRequestBody.getInstance());
            if (response.get() == null || ((AsynchronousOperationResult)response.get()).resource() == null) {
                SavepointFetchResult savepointFetchResult = SavepointFetchResult.pending();
                return savepointFetchResult;
            }
            if (((SavepointInfo)((AsynchronousOperationResult)response.get()).resource()).getLocation() == null) {
                if (((SavepointInfo)((AsynchronousOperationResult)response.get()).resource()).getFailureCause() != null) {
                    LOG.error("Failure occurred while fetching the savepoint result", (Throwable)((SavepointInfo)((AsynchronousOperationResult)response.get()).resource()).getFailureCause());
                    SavepointFetchResult savepointFetchResult = SavepointFetchResult.error(((SavepointInfo)((AsynchronousOperationResult)response.get()).resource()).getFailureCause().toString());
                    return savepointFetchResult;
                }
                SavepointFetchResult savepointFetchResult = SavepointFetchResult.pending();
                return savepointFetchResult;
            }
            String location = ((SavepointInfo)((AsynchronousOperationResult)response.get()).resource()).getLocation();
            LOG.info("Savepoint result: {}", (Object)location);
            SavepointFetchResult savepointFetchResult = SavepointFetchResult.completed(location);
            return savepointFetchResult;
        }
        catch (Exception e) {
            LOG.error("Exception while fetching the savepoint result", (Throwable)e);
            return SavepointFetchResult.error(e.getMessage());
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public CheckpointFetchResult fetchCheckpointInfo(String triggerId, String jobId, Configuration conf) {
        LOG.info("Fetching checkpoint result with triggerId: " + triggerId);
        try (RestClusterClient<String> clusterClient = this.getClusterClient(conf);){
            CheckpointStatusHeaders checkpointStatusHeaders = CheckpointStatusHeaders.getInstance();
            CheckpointStatusMessageParameters checkpointStatusMessageParameters = checkpointStatusHeaders.getUnresolvedMessageParameters();
            checkpointStatusMessageParameters.jobIdPathParameter.resolve((Object)JobID.fromHexString((String)jobId));
            checkpointStatusMessageParameters.triggerIdPathParameter.resolve((Object)TriggerId.fromHexString((String)triggerId));
            CompletableFuture response = clusterClient.sendRequest((MessageHeaders)checkpointStatusHeaders, (MessageParameters)checkpointStatusMessageParameters, (RequestBody)EmptyRequestBody.getInstance());
            if (response.get() == null || ((AsynchronousOperationResult)response.get()).resource() == null) {
                CheckpointFetchResult checkpointFetchResult = CheckpointFetchResult.pending();
                return checkpointFetchResult;
            }
            if (((CheckpointInfo)((AsynchronousOperationResult)response.get()).resource()).getFailureCause() != null) {
                LOG.error("Failure occurred while fetching the checkpoint result", (Throwable)((CheckpointInfo)((AsynchronousOperationResult)response.get()).resource()).getFailureCause());
                CheckpointFetchResult checkpointFetchResult = CheckpointFetchResult.error(((CheckpointInfo)((AsynchronousOperationResult)response.get()).resource()).getFailureCause().toString());
                return checkpointFetchResult;
            }
            QueueStatus.Id operationStatus = ((AsynchronousOperationResult)response.get()).queueStatus().getId();
            switch (operationStatus) {
                case IN_PROGRESS: {
                    CheckpointFetchResult checkpointFetchResult = CheckpointFetchResult.pending();
                    return checkpointFetchResult;
                }
                case COMPLETED: {
                    LOG.info("Checkpoint {} triggered by the operator for job {} completed:", (Object)triggerId, (Object)jobId);
                    CheckpointFetchResult checkpointFetchResult = CheckpointFetchResult.completed();
                    return checkpointFetchResult;
                }
            }
            throw new IllegalStateException(String.format("Checkpoint %s for job %s is reported to be in an unknown status: %s", triggerId, jobId, operationStatus.name()));
        }
        catch (Exception e) {
            LOG.error("Exception while fetching the checkpoint result", (Throwable)e);
            return CheckpointFetchResult.error(e.getMessage());
        }
    }

    @Override
    public Map<String, String> getClusterInfo(Configuration conf) throws Exception {
        HashMap<String, String> clusterInfo = new HashMap<String, String>();
        try (RestClusterClient<String> clusterClient = this.getClusterClient(conf);){
            CustomDashboardConfiguration dashboardConfiguration = (CustomDashboardConfiguration)clusterClient.sendRequest((MessageHeaders)CustomDashboardConfigurationHeaders.getInstance(), (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)EmptyRequestBody.getInstance()).get(this.operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
            clusterInfo.put("flink-version", dashboardConfiguration.getFlinkVersion());
            clusterInfo.put("flink-revision", dashboardConfiguration.getFlinkRevision());
        }
        int taskManagerReplicas = this.getTaskManagersInfo(conf).getTaskManagerInfos().size();
        clusterInfo.put(FIELD_NAME_TOTAL_CPU, String.valueOf(FlinkUtils.calculateClusterCpuUsage(conf, taskManagerReplicas)));
        clusterInfo.put(FIELD_NAME_TOTAL_MEMORY, String.valueOf(FlinkUtils.calculateClusterMemoryUsage(conf, taskManagerReplicas)));
        return clusterInfo;
    }

    @Override
    public PodList getJmPodList(FlinkDeployment deployment, Configuration conf) {
        String namespace = conf.getString(KubernetesConfigOptions.NAMESPACE);
        String clusterId = conf.getString(KubernetesConfigOptions.CLUSTER_ID);
        return this.getJmPodList(namespace, clusterId);
    }

    @Override
    public RestClusterClient<String> getClusterClient(Configuration conf) throws Exception {
        String clusterId = (String)conf.get(KubernetesConfigOptions.CLUSTER_ID);
        String namespace = (String)conf.get(KubernetesConfigOptions.NAMESPACE);
        int port = conf.getInteger(RestOptions.PORT);
        Configuration operatorRestConf = conf;
        if (SecurityOptions.isRestSSLEnabled((Configuration)conf)) {
            operatorRestConf = this.getOperatorRestConfig(conf);
        }
        String host = (String)ObjectUtils.firstNonNull((Object[])new String[]{this.operatorConfig.getFlinkServiceHostOverride(), ExternalServiceDecorator.getNamespacedExternalServiceName((String)clusterId, (String)namespace)});
        String restServerAddress = String.format("http://%s:%s", host, port);
        LOG.debug("Creating RestClusterClient({})", (Object)restServerAddress);
        return new RestClusterClient(operatorRestConf, (Object)clusterId, (c, e) -> new StandaloneClientHAServices(restServerAddress));
    }

    @VisibleForTesting
    protected void runJar(JobSpec job, JobID jobID, JarUploadResponseBody response, Configuration conf, String savepoint) {
        String jarId = response.getFilename().substring(response.getFilename().lastIndexOf("/") + 1);
        try (RestClusterClient<String> clusterClient = this.getClusterClient(conf);){
            JarRunHeaders headers = JarRunHeaders.getInstance();
            JarRunMessageParameters parameters = headers.getUnresolvedMessageParameters();
            parameters.jarIdPathParameter.resolve((Object)jarId);
            JarRunRequestBody runRequestBody = new JarRunRequestBody(job.getEntryClass(), null, job.getArgs() == null ? null : Arrays.asList(job.getArgs()), job.getParallelism() > 0 ? Integer.valueOf(job.getParallelism()) : null, jobID, job.getAllowNonRestoredState(), savepoint, RestoreMode.DEFAULT, ((FlinkVersion)conf.get(FlinkConfigBuilder.FLINK_VERSION)).isEqualOrNewer(FlinkVersion.v1_17) ? conf.toMap() : null);
            LOG.info("Submitting job: {} to session cluster.", (Object)jobID);
            clusterClient.sendRequest((MessageHeaders)headers, (MessageParameters)parameters, (RequestBody)runRequestBody).get(this.operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
        }
        catch (Exception e) {
            LOG.error("Failed to submit job to session cluster.", (Throwable)e);
            throw new FlinkRuntimeException((Throwable)e);
        }
        finally {
            this.deleteJar(conf, jarId);
        }
    }

    @VisibleForTesting
    protected JarUploadResponseBody uploadJar(ObjectMeta objectMeta, FlinkSessionJobSpec spec, Configuration conf) throws Exception {
        String targetDir = this.artifactManager.generateJarDir(objectMeta, spec);
        File jarFile = this.artifactManager.fetch(this.findJarURI(spec.getJob()), conf, targetDir);
        Preconditions.checkArgument((boolean)jarFile.exists(), (Object)String.format("The jar file %s not exists", jarFile.getAbsolutePath()));
        JarUploadHeaders headers = JarUploadHeaders.getInstance();
        String clusterId = spec.getDeploymentName();
        String namespace = objectMeta.getNamespace();
        int port = conf.getInteger(RestOptions.PORT);
        String host = (String)ObjectUtils.firstNonNull((Object[])new String[]{this.operatorConfig.getFlinkServiceHostOverride(), ExternalServiceDecorator.getNamespacedExternalServiceName((String)clusterId, (String)namespace)});
        try {
            JarUploadResponseBody jarUploadResponseBody;
            block9: {
                RestClient restClient = this.getRestClient(conf);
                try {
                    jarUploadResponseBody = (JarUploadResponseBody)restClient.sendRequest(host, port, (MessageHeaders)headers, (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)EmptyRequestBody.getInstance(), Collections.singletonList(new FileUpload(jarFile.toPath(), "application/java-archive"))).get(this.operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
                    if (restClient == null) break block9;
                }
                catch (Throwable throwable) {
                    if (restClient != null) {
                        try {
                            restClient.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                restClient.close();
            }
            return jarUploadResponseBody;
        }
        finally {
            LOG.debug("Deleting the jar file {}", (Object)jarFile);
            FileUtils.deleteFileOrDirectory((File)jarFile);
        }
    }

    @VisibleForTesting
    protected RestClient getRestClient(Configuration conf) throws Exception {
        Configuration operatorRestConf = conf;
        if (SecurityOptions.isRestSSLEnabled((Configuration)conf)) {
            operatorRestConf = this.getOperatorRestConfig(operatorRestConf);
        }
        return new RestClient(operatorRestConf, (Executor)this.executorService);
    }

    private String findJarURI(JobSpec jobSpec) {
        if (jobSpec.getJarURI() != null) {
            return jobSpec.getJarURI();
        }
        return EMPTY_JAR;
    }

    private void deleteJar(Configuration conf, String jarId) {
        LOG.debug("Deleting the jar: {}", (Object)jarId);
        try (RestClusterClient<String> clusterClient = this.getClusterClient(conf);){
            JarDeleteHeaders headers = JarDeleteHeaders.getInstance();
            JarDeleteMessageParameters parameters = headers.getUnresolvedMessageParameters();
            parameters.jarIdPathParameter.resolve((Object)jarId);
            clusterClient.sendRequest((MessageHeaders)headers, (MessageParameters)parameters, (RequestBody)EmptyRequestBody.getInstance()).get(this.operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
        }
        catch (Exception e) {
            LOG.error("Failed to delete the jar: {}.", (Object)jarId, (Object)e);
        }
    }

    @VisibleForTesting
    protected Duration deleteDeploymentBlocking(String name, Resource<Deployment> deployment, DeletionPropagation propagation, Duration timeout) {
        return AbstractFlinkService.deleteBlocking(String.format("Deleting %s Deployment", name), () -> {
            ((GracePeriodConfigurable)deployment.withPropagationPolicy(propagation)).delete();
            return deployment;
        }, timeout);
    }

    @VisibleForTesting
    protected static Configuration removeOperatorConfigs(Configuration config) {
        Configuration newConfig = new Configuration();
        config.toMap().forEach((k, v) -> {
            if (!k.startsWith("kubernetes.operator.")) {
                newConfig.setString(k, v);
            }
        });
        return newConfig;
    }

    private void validateHaMetadataExists(Configuration conf) {
        if (!this.isHaMetadataAvailable(conf)) {
            throw new RecoveryFailureException("HA metadata not available to restore from last state. It is possible that the job has finished or terminally failed, or the configmaps have been deleted. ", "RestoreFailed");
        }
    }

    private static String createEmptyJar() {
        try {
            String emptyJarPath = Files.createTempDirectory("flink", new FileAttribute[0]).toString() + "/empty.jar";
            LOG.debug("Creating empty jar to {}", (Object)emptyJarPath);
            JarOutputStream target = new JarOutputStream((OutputStream)new FileOutputStream(emptyJarPath), new Manifest());
            target.close();
            return emptyJarPath;
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to create empty jar", e);
        }
    }

    @Override
    public Map<String, String> getMetrics(Configuration conf, String jobId, List<String> metricNames) throws Exception {
        try (RestClusterClient<String> clusterClient = this.getClusterClient(conf);){
            JobMetricsMessageParameters jobMetricsMessageParameters = JobMetricsHeaders.getInstance().getUnresolvedMessageParameters();
            jobMetricsMessageParameters.jobPathParameter.resolve((Object)JobID.fromHexString((String)jobId));
            jobMetricsMessageParameters.metricsFilterParameter.resolve(metricNames);
            MetricCollectionResponseBody responseBody = (MetricCollectionResponseBody)clusterClient.sendRequest((MessageHeaders)JobMetricsHeaders.getInstance(), (MessageParameters)jobMetricsMessageParameters, (RequestBody)EmptyRequestBody.getInstance()).get(this.operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
            Map<String, String> map = responseBody.getMetrics().stream().map(metric -> Tuple2.of((Object)metric.getId(), (Object)metric.getValue())).collect(Collectors.toMap(t -> (String)t.f0, t -> (String)t.f1));
            return map;
        }
    }

    private TaskManagersInfo getTaskManagersInfo(Configuration conf) throws Exception {
        try (RestClusterClient<String> clusterClient = this.getClusterClient(conf);){
            TaskManagersInfo taskManagersInfo = (TaskManagersInfo)clusterClient.sendRequest((MessageHeaders)TaskManagersHeaders.getInstance(), (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)EmptyRequestBody.getInstance()).get(this.operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
            return taskManagersInfo;
        }
    }

    @Override
    public final void deleteClusterDeployment(ObjectMeta meta, FlinkDeploymentStatus status, Configuration conf, boolean deleteHaData) {
        String namespace = meta.getNamespace();
        String clusterId = meta.getName();
        DeletionPropagation deletionPropagation = this.operatorConfig.getDeletionPropagation();
        LOG.info("Deleting cluster with {} propagation", (Object)deletionPropagation);
        this.deleteClusterInternal(namespace, clusterId, conf, deletionPropagation);
        if (deleteHaData) {
            this.deleteHAData(namespace, clusterId, conf);
        } else {
            LOG.info("Keeping HA metadata for last-state restore");
        }
        this.updateStatusAfterClusterDeletion(status);
    }

    protected abstract void deleteClusterInternal(String var1, String var2, Configuration var3, DeletionPropagation var4);

    protected void deleteHAData(String namespace, String clusterId, Configuration conf) {
        if (FlinkUtils.isKubernetesHAActivated(conf)) {
            LOG.info("Deleting Kubernetes HA metadata");
            FlinkUtils.deleteKubernetesHAMetadata(clusterId, namespace, this.kubernetesClient);
        } else if (FlinkUtils.isZookeeperHAActivated(conf)) {
            LOG.info("Deleting Zookeeper HA metadata");
            FlinkUtils.deleteZookeeperHAMetadata(conf);
        }
    }

    protected void updateStatusAfterClusterDeletion(FlinkDeploymentStatus status) {
        status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
        String currentJobState = status.getJobStatus().getState();
        if (currentJobState == null || !org.apache.flink.api.common.JobStatus.valueOf((String)currentJobState).isGloballyTerminalState()) {
            status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
        }
    }

    private Configuration getOperatorRestConfig(Configuration origConfig) throws IOException {
        Configuration conf = new Configuration(origConfig);
        EnvUtils.get("OPERATOR_TRUSTSTORE_PATH").ifPresent(path -> {
            if (Files.notExists(Paths.get(path, new String[0]), new LinkOption[0])) {
                return;
            }
            conf.set(SecurityOptions.SSL_REST_TRUSTSTORE, (Object)EnvUtils.getRequired("OPERATOR_TRUSTSTORE_PATH"));
            conf.set(SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD, (Object)EnvUtils.getRequired("OPERATOR_KEYSTORE_PASSWORD"));
            if (SecurityOptions.isRestSSLAuthenticationEnabled((Configuration)conf) && EnvUtils.get("OPERATOR_KEYSTORE_PATH").isPresent()) {
                conf.set(SecurityOptions.SSL_REST_KEYSTORE, (Object)EnvUtils.getRequired("OPERATOR_KEYSTORE_PATH"));
                conf.set(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD, (Object)EnvUtils.getRequired("OPERATOR_KEYSTORE_PASSWORD"));
                conf.set(SecurityOptions.SSL_REST_KEY_PASSWORD, (Object)EnvUtils.getRequired("OPERATOR_KEYSTORE_PASSWORD"));
            } else {
                conf.removeConfig(SecurityOptions.SSL_REST_KEYSTORE);
                conf.removeConfig(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD);
            }
            conf.removeConfig(SecurityOptions.SSL_TRUSTSTORE);
            conf.removeConfig(SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
            conf.removeConfig(SecurityOptions.SSL_KEYSTORE);
            conf.removeConfig(SecurityOptions.SSL_KEYSTORE_PASSWORD);
        });
        return conf;
    }

    protected static Duration deleteBlocking(String operation, Callable<Waitable> delete, Duration timeout) {
        long start;
        block8: {
            Waitable deleted;
            block7: {
                LOG.info("{} with {} seconds timeout...", (Object)operation, (Object)timeout.toSeconds());
                start = System.currentTimeMillis();
                deleted = null;
                try {
                    deleted = delete.call();
                }
                catch (KubernetesClientException kce) {
                    if (kce.getCode() == 404) break block7;
                    throw kce;
                }
            }
            if (deleted != null) {
                try {
                    deleted.waitUntilCondition(Objects::isNull, timeout.toMillis(), TimeUnit.MILLISECONDS);
                    LOG.info("Completed {}", (Object)operation);
                }
                catch (KubernetesClientException kce) {
                    if (kce.getCode() == 404) break block8;
                    LOG.warn("Error while " + operation, (Throwable)kce);
                }
            }
        }
        long elapsedMillis = System.currentTimeMillis() - start;
        return Duration.ofMillis(Math.max(0L, timeout.toMillis() - elapsedMillis));
    }

    private static /* synthetic */ void lambda$cancelSessionJob$1(JobStatus jobStatus, String location) {
        Savepoint sp = Savepoint.of((String)location, (SnapshotTriggerType)SnapshotTriggerType.UPGRADE);
        jobStatus.getSavepointInfo().updateLastSavepoint(sp);
    }

    private static /* synthetic */ void lambda$cancelJob$0(org.apache.flink.core.execution.SavepointFormatType savepointFormatType, FlinkDeploymentStatus deploymentStatus, String location) {
        Savepoint sp = Savepoint.of((String)location, (SnapshotTriggerType)SnapshotTriggerType.UPGRADE, (SavepointFormatType)SavepointFormatType.valueOf((String)savepointFormatType.name()));
        deploymentStatus.getJobStatus().getSavepointInfo().updateLastSavepoint(sp);
    }
}

