/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.config;

import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.PodSpec;
import io.fabric8.kubernetes.api.model.PodTemplateSpec;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.ResourceRequirements;
import io.fabric8.kubernetes.client.utils.Serialization;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.DeploymentOptionsInternal;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
import org.apache.flink.kubernetes.operator.api.spec.Resource;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkConfigBuilder {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkConfigBuilder.class);
    public static final ConfigOption<FlinkVersion> FLINK_VERSION = ConfigOptions.key((String)"$internal.flink.version").enumType(FlinkVersion.class).noDefaultValue();
    protected static final String GENERATED_FILE_PREFIX = "flink_op_generated_";
    protected static final Duration DEFAULT_CHECKPOINTING_INTERVAL = Duration.ofMinutes(5L);
    private final String namespace;
    private final String clusterId;
    private final FlinkDeploymentSpec spec;
    private final Configuration effectiveConfig;

    protected FlinkConfigBuilder(FlinkDeployment deployment, Configuration flinkConf) {
        this(deployment.getMetadata().getNamespace(), deployment.getMetadata().getName(), (FlinkDeploymentSpec)deployment.getSpec(), flinkConf);
    }

    protected FlinkConfigBuilder(String namespace, String clusterId, FlinkDeploymentSpec spec, Configuration flinkConf) {
        this.namespace = namespace;
        this.clusterId = clusterId;
        this.spec = spec;
        this.effectiveConfig = flinkConf;
    }

    protected FlinkConfigBuilder applyImage() {
        if (!StringUtils.isNullOrWhitespaceOnly((String)this.spec.getImage())) {
            String configKey = this.spec.getFlinkVersion().isEqualOrNewer(FlinkVersion.v1_17) ? KubernetesConfigOptions.CONTAINER_IMAGE.key() : "kubernetes.container.image";
            this.effectiveConfig.setString(configKey, this.spec.getImage());
        }
        return this;
    }

    protected FlinkConfigBuilder applyImagePullPolicy() {
        if (!StringUtils.isNullOrWhitespaceOnly((String)this.spec.getImagePullPolicy())) {
            this.effectiveConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY, (Object)KubernetesConfigOptions.ImagePullPolicy.valueOf((String)this.spec.getImagePullPolicy()));
        }
        return this;
    }

    protected FlinkConfigBuilder applyFlinkConfiguration() {
        if (this.spec.getFlinkConfiguration() != null && !this.spec.getFlinkConfiguration().isEmpty()) {
            this.spec.getFlinkConfiguration().forEach((arg_0, arg_1) -> ((Configuration)this.effectiveConfig).setString(arg_0, arg_1));
        }
        this.setDefaultConf(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE, KubernetesConfigOptions.ServiceExposedType.ClusterIP);
        this.setDefaultConf(WebOptions.CANCEL_ENABLE, false);
        if (this.spec.getJob() != null) {
            this.setDefaultConf(PipelineOptions.NAME, this.clusterId);
            if (this.spec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE) {
                this.setDefaultConf(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, DEFAULT_CHECKPOINTING_INTERVAL);
            }
            this.effectiveConfig.set(DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH, (Object)false);
            if (HighAvailabilityMode.isHighAvailabilityModeActivated((Configuration)this.effectiveConfig)) {
                this.setDefaultConf(DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR, true);
            }
            this.setDefaultConf(ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT, CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        }
        this.effectiveConfig.set(FLINK_VERSION, (Object)this.spec.getFlinkVersion());
        return this;
    }

    protected FlinkConfigBuilder applyLogConfiguration() throws IOException {
        if (this.spec.getLogConfiguration() != null) {
            String confDir = FlinkConfigBuilder.createLogConfigFiles((String)this.spec.getLogConfiguration().get("log4j-console.properties"), (String)this.spec.getLogConfiguration().get("logback-console.xml"));
            this.effectiveConfig.setString(DeploymentOptionsInternal.CONF_DIR, confDir);
        }
        return this;
    }

    protected FlinkConfigBuilder applyPodTemplate() throws IOException {
        PodTemplateSpec tmPodTemplate;
        PodTemplateSpec jmPodTemplate;
        PodTemplateSpec commonPodTemplate = this.spec.getPodTemplate();
        boolean mergeByName = (Boolean)this.effectiveConfig.get(KubernetesOperatorConfigOptions.POD_TEMPLATE_MERGE_BY_NAME);
        if (this.spec.getJobManager() != null) {
            jmPodTemplate = FlinkUtils.mergePodTemplates(commonPodTemplate, this.spec.getJobManager().getPodTemplate(), mergeByName);
            jmPodTemplate = FlinkConfigBuilder.applyResourceToPodTemplate(jmPodTemplate, this.spec.getJobManager().getResource());
        } else {
            jmPodTemplate = ReconciliationUtils.clone(commonPodTemplate);
        }
        if (((Boolean)this.effectiveConfig.get(KubernetesOperatorConfigOptions.OPERATOR_JM_STARTUP_PROBE_ENABLED)).booleanValue()) {
            if (jmPodTemplate == null) {
                jmPodTemplate = new PodTemplateSpec();
            }
            FlinkUtils.addStartupProbe(jmPodTemplate);
        }
        String jmTemplateFile = null;
        if (jmPodTemplate != null) {
            jmTemplateFile = FlinkConfigBuilder.createTempFile(jmPodTemplate);
            this.effectiveConfig.set(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE, (Object)jmTemplateFile);
        }
        if (this.spec.getTaskManager() != null) {
            tmPodTemplate = FlinkUtils.mergePodTemplates(commonPodTemplate, this.spec.getTaskManager().getPodTemplate(), mergeByName);
            tmPodTemplate = FlinkConfigBuilder.applyResourceToPodTemplate(tmPodTemplate, this.spec.getTaskManager().getResource());
        } else {
            tmPodTemplate = ReconciliationUtils.clone(commonPodTemplate);
        }
        if (tmPodTemplate != null) {
            this.effectiveConfig.set(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE, (Object)(tmPodTemplate.equals((Object)jmPodTemplate) ? jmTemplateFile : FlinkConfigBuilder.createTempFile(tmPodTemplate)));
        }
        return this;
    }

    protected FlinkConfigBuilder applyIngressDomain() {
        if (this.spec.getIngress() != null) {
            this.effectiveConfig.set(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE, (Object)KubernetesConfigOptions.ServiceExposedType.ClusterIP);
        }
        return this;
    }

    protected FlinkConfigBuilder applyServiceAccount() {
        if (this.spec.getServiceAccount() != null) {
            this.effectiveConfig.set(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT, (Object)this.spec.getServiceAccount());
        }
        return this;
    }

    protected FlinkConfigBuilder applyJobManagerSpec() {
        if (this.spec.getJobManager() != null) {
            this.setResource(this.spec.getJobManager().getResource(), this.effectiveConfig, true);
            if (this.spec.getJobManager().getReplicas() > 0) {
                this.effectiveConfig.set(KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS, (Object)this.spec.getJobManager().getReplicas());
            }
        }
        return this;
    }

    protected FlinkConfigBuilder applyTaskManagerSpec() {
        if (this.spec.getTaskManager() != null) {
            this.setResource(this.spec.getTaskManager().getResource(), this.effectiveConfig, false);
            if (this.spec.getTaskManager().getReplicas() != null && this.spec.getTaskManager().getReplicas() > 0) {
                this.effectiveConfig.set(StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS, (Object)this.spec.getTaskManager().getReplicas());
            }
        }
        if (this.spec.getJob() != null && KubernetesDeploymentMode.getDeploymentMode((FlinkDeploymentSpec)this.spec) == KubernetesDeploymentMode.STANDALONE && !this.effectiveConfig.contains(StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS)) {
            this.effectiveConfig.set(StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS, (Object)FlinkUtils.getNumTaskManagers(this.effectiveConfig, this.getParallelism()));
        }
        return this;
    }

    protected FlinkConfigBuilder applyJobOrSessionSpec() throws URISyntaxException {
        KubernetesDeploymentMode deploymentMode = KubernetesDeploymentMode.getDeploymentMode((FlinkDeploymentSpec)this.spec);
        if (this.spec.getJob() != null) {
            JobSpec jobSpec = this.spec.getJob();
            this.effectiveConfig.set(DeploymentOptions.TARGET, (Object)KubernetesDeploymentTarget.APPLICATION.getName());
            if (jobSpec.getJarURI() != null) {
                URI uri = new URI(jobSpec.getJarURI());
                this.effectiveConfig.set(PipelineOptions.JARS, Collections.singletonList(uri.toString()));
            }
            this.effectiveConfig.set(CoreOptions.DEFAULT_PARALLELISM, (Object)this.getParallelism());
            if (jobSpec.getAllowNonRestoredState() != null) {
                this.effectiveConfig.set(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, (Object)jobSpec.getAllowNonRestoredState());
            }
            if (jobSpec.getEntryClass() != null) {
                this.effectiveConfig.set(ApplicationConfiguration.APPLICATION_MAIN_CLASS, (Object)jobSpec.getEntryClass());
            }
            if (jobSpec.getArgs() != null) {
                this.effectiveConfig.set(ApplicationConfiguration.APPLICATION_ARGS, Arrays.asList(jobSpec.getArgs()));
            }
        } else {
            this.effectiveConfig.set(DeploymentOptions.TARGET, (Object)KubernetesDeploymentTarget.SESSION.getName());
        }
        if (deploymentMode == KubernetesDeploymentMode.STANDALONE) {
            this.effectiveConfig.set(DeploymentOptions.TARGET, (Object)"remote");
            this.effectiveConfig.set(StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE, (Object)(this.spec.getJob() == null ? StandaloneKubernetesConfigOptionsInternal.ClusterMode.SESSION : StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION));
            if (this.spec.getJob() != null && this.spec.getJob().getJarURI() != null) {
                this.effectiveConfig.set(PipelineOptions.CLASSPATHS, Collections.singletonList(this.getStandaloneJarURI(this.spec.getJob())));
            }
        }
        return this;
    }

    private String getStandaloneJarURI(JobSpec jobSpec) throws URISyntaxException {
        URI uri = new URI(jobSpec.getJarURI());
        if (uri.getScheme().equals("local")) {
            uri = new URI("file", uri.getAuthority() == null ? "" : uri.getAuthority(), uri.getPath(), uri.getQuery(), uri.getFragment());
        }
        return uri.toASCIIString();
    }

    private int getParallelism() {
        if (this.spec.getTaskManager() != null && this.spec.getTaskManager().getReplicas() != null) {
            if (this.spec.getJob().getParallelism() > 0) {
                LOG.warn("Job parallelism setting is ignored as TaskManager replicas are set");
            }
            return this.spec.getTaskManager().getReplicas() * (Integer)this.effectiveConfig.get(TaskManagerOptions.NUM_TASK_SLOTS);
        }
        Optional<Integer> maxOverrideParallelism = this.getMaxParallelismFromOverrideConfig();
        if (maxOverrideParallelism.isPresent() && maxOverrideParallelism.get() > 0) {
            return maxOverrideParallelism.get();
        }
        return this.spec.getJob().getParallelism();
    }

    private Optional<Integer> getMaxParallelismFromOverrideConfig() {
        return this.effectiveConfig.getOptional(PipelineOptions.PARALLELISM_OVERRIDES).flatMap(overrides -> overrides.values().stream().map(Integer::valueOf).max(Integer::compareTo));
    }

    protected Configuration build() {
        this.effectiveConfig.setString(KubernetesConfigOptions.NAMESPACE, this.namespace);
        this.effectiveConfig.setString(KubernetesConfigOptions.CLUSTER_ID, this.clusterId);
        if (HighAvailabilityMode.isHighAvailabilityModeActivated((Configuration)this.effectiveConfig)) {
            this.effectiveConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID, this.clusterId);
        }
        return this.effectiveConfig;
    }

    public static Configuration buildFrom(String namespace, String clusterId, FlinkDeploymentSpec spec, Configuration flinkConfig) throws IOException, URISyntaxException {
        return new FlinkConfigBuilder(namespace, clusterId, spec, flinkConfig).applyFlinkConfiguration().applyLogConfiguration().applyImage().applyImagePullPolicy().applyServiceAccount().applyPodTemplate().applyIngressDomain().applyJobManagerSpec().applyTaskManagerSpec().applyJobOrSessionSpec().build();
    }

    private <T> void setDefaultConf(ConfigOption<T> option, T value) {
        if (!this.effectiveConfig.contains(option)) {
            this.effectiveConfig.set(option, value);
        }
    }

    private void setResource(Resource resource, Configuration effectiveConfig, boolean isJM) {
        if (resource != null) {
            ConfigOption memoryConfigOption;
            ConfigOption configOption = memoryConfigOption = isJM ? JobManagerOptions.TOTAL_PROCESS_MEMORY : TaskManagerOptions.TOTAL_PROCESS_MEMORY;
            if (resource.getMemory() != null) {
                effectiveConfig.setString(memoryConfigOption.key(), FlinkConfigBuilder.parseResourceMemoryString(resource.getMemory()));
            }
            this.configureCpu(resource, effectiveConfig, isJM);
        }
    }

    public static String parseResourceMemoryString(String memory) {
        try {
            return MemorySize.parse((String)memory).toString();
        }
        catch (IllegalArgumentException e) {
            String memoryQuantity = FlinkConfigBuilder.formatMemoryStringForK8sSpec(memory);
            return "" + Quantity.parse((String)memoryQuantity).getNumericalAmount();
        }
    }

    private static String formatMemoryStringForK8sSpec(String memory) {
        Object memoryQuantity = memory.trim().replaceAll("\\s", "").toUpperCase();
        if (((String)memoryQuantity).endsWith("B")) {
            memoryQuantity = ((String)memoryQuantity).substring(0, ((String)memoryQuantity).length() - 1);
        }
        if (((String)memoryQuantity).endsWith("I")) {
            memoryQuantity = ((String)memoryQuantity).substring(0, ((String)memoryQuantity).length() - 1) + "i";
        }
        return memoryQuantity;
    }

    private void configureCpu(Resource resource, Configuration conf, boolean isJM) {
        if (resource.getCpu() == null) {
            return;
        }
        boolean newConfKeys = this.spec.getFlinkVersion().isEqualOrNewer(FlinkVersion.v1_17);
        String configKey = null;
        if (isJM) {
            conf.setDouble(KubernetesConfigOptions.JOB_MANAGER_CPU.key(), resource.getCpu().doubleValue());
            if (!newConfKeys) {
                configKey = "kubernetes.jobmanager.cpu";
            }
        } else {
            conf.setDouble(KubernetesConfigOptions.TASK_MANAGER_CPU.key(), resource.getCpu().doubleValue());
            if (!newConfKeys) {
                configKey = "kubernetes.taskmanager.cpu";
            }
        }
        if (configKey != null) {
            conf.setDouble(configKey, resource.getCpu().doubleValue());
        }
    }

    @VisibleForTesting
    protected static PodTemplateSpec applyResourceToPodTemplate(PodTemplateSpec podTemplate, Resource resource) {
        if (resource == null || StringUtils.isNullOrWhitespaceOnly((String)resource.getEphemeralStorage())) {
            return podTemplate;
        }
        if (podTemplate == null) {
            PodTemplateSpec newPodTemplate = new PodTemplateSpec();
            newPodTemplate.setSpec(FlinkConfigBuilder.createPodSpecWithResource(resource));
            return newPodTemplate;
        }
        if (podTemplate.getSpec() == null) {
            podTemplate.setSpec(FlinkConfigBuilder.createPodSpecWithResource(resource));
            return podTemplate;
        }
        boolean hasMainContainer = false;
        for (Container container : podTemplate.getSpec().getContainers()) {
            if (!container.getName().equals("flink-main-container")) continue;
            FlinkConfigBuilder.decorateContainerWithEphemeralStorage(container, resource.getEphemeralStorage());
            hasMainContainer = true;
        }
        if (!hasMainContainer) {
            podTemplate.getSpec().getContainers().add(FlinkConfigBuilder.decorateContainerWithEphemeralStorage(new Container(), resource.getEphemeralStorage()));
        }
        return podTemplate;
    }

    private static PodSpec createPodSpecWithResource(Resource resource) {
        PodSpec spec = new PodSpec();
        spec.getContainers().add(FlinkConfigBuilder.decorateContainerWithEphemeralStorage(new Container(), resource.getEphemeralStorage()));
        return spec;
    }

    private static Container decorateContainerWithEphemeralStorage(Container container, String ephemeralStorage) {
        container.setName("flink-main-container");
        ResourceRequirements resourceRequirements = container.getResources() == null ? new ResourceRequirements() : container.getResources();
        resourceRequirements.getLimits().put("ephemeral-storage", Quantity.parse((String)ephemeralStorage));
        resourceRequirements.getRequests().put("ephemeral-storage", Quantity.parse((String)ephemeralStorage));
        container.setResources(resourceRequirements);
        return container;
    }

    private static String createLogConfigFiles(String log4jConf, String logbackConf) throws IOException {
        File tmpDir = Files.createTempDirectory("flink_op_generated_conf_", new FileAttribute[0]).toFile();
        if (log4jConf != null) {
            File log4jConfFile = new File(tmpDir.getAbsolutePath(), "log4j-console.properties");
            Files.write(log4jConfFile.toPath(), log4jConf.getBytes(), new OpenOption[0]);
        }
        if (logbackConf != null) {
            File logbackConfFile = new File(tmpDir.getAbsolutePath(), "logback-console.xml");
            Files.write(logbackConfFile.toPath(), logbackConf.getBytes(), new OpenOption[0]);
        }
        return tmpDir.getAbsolutePath();
    }

    private static String createTempFile(PodTemplateSpec podTemplate) throws IOException {
        File tmp = File.createTempFile("flink_op_generated_podTemplate_", ".yaml");
        Files.write(tmp.toPath(), Serialization.asYaml((Object)podTemplate).getBytes(), new OpenOption[0]);
        return tmp.getAbsolutePath();
    }

    protected static void cleanupTmpFiles(Configuration configuration) {
        configuration.getOptional(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE).ifPresent(FlinkConfigBuilder::deleteSilentlyIfGenerated);
        configuration.getOptional(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE).ifPresent(FlinkConfigBuilder::deleteSilentlyIfGenerated);
        configuration.getOptional(DeploymentOptionsInternal.CONF_DIR).ifPresent(FlinkConfigBuilder::deleteSilentlyIfGenerated);
    }

    private static void deleteSilentlyIfGenerated(String file) {
        try {
            File localFile = new File(file);
            if (!localFile.getName().startsWith(GENERATED_FILE_PREFIX)) {
                return;
            }
            LOG.debug("Deleting tmp config file {}", (Object)localFile);
            FileUtils.deleteFileOrDirectory((File)localFile);
        }
        catch (Exception err) {
            LOG.error("Could not clean up file " + file, (Throwable)err);
        }
    }
}

