/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.validation;

import io.fabric8.kubernetes.api.model.Quantity;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.api.spec.IngressSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobManagerSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
import org.apache.flink.kubernetes.operator.api.spec.Resource;
import org.apache.flink.kubernetes.operator.api.spec.TaskManagerSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils;
import org.apache.flink.util.StringUtils;

public class DefaultValidator
implements FlinkResourceValidator {
    private static final Pattern DEPLOYMENT_NAME_PATTERN = Pattern.compile("[a-z]([-a-z\\d]{0,43}[a-z\\d])?");
    private static final String[] FORBIDDEN_CONF_KEYS = new String[]{KubernetesConfigOptions.NAMESPACE.key(), KubernetesConfigOptions.CLUSTER_ID.key(), HighAvailabilityOptions.HA_CLUSTER_ID.key()};
    private static final Set<String> ALLOWED_LOG_CONF_KEYS = Set.of("log4j-console.properties", "logback-console.xml");
    private final FlinkConfigManager configManager;

    public DefaultValidator(FlinkConfigManager configManager) {
        this.configManager = configManager;
    }

    @Override
    public Optional<String> validateDeployment(FlinkDeployment deployment) {
        FlinkDeploymentSpec spec = (FlinkDeploymentSpec)deployment.getSpec();
        Map effectiveConfig = this.configManager.getDefaultConfig(deployment.getMetadata().getNamespace(), spec.getFlinkVersion()).toMap();
        if (spec.getFlinkConfiguration() != null) {
            effectiveConfig.putAll(spec.getFlinkConfiguration());
        }
        return DefaultValidator.firstPresent(this.validateDeploymentName(deployment.getMetadata().getName()), this.validateFlinkVersion(deployment), this.validateFlinkDeploymentConfig(effectiveConfig), this.validateIngress(spec.getIngress(), deployment.getMetadata().getName(), deployment.getMetadata().getNamespace()), this.validateLogConfig(spec.getLogConfiguration()), this.validateJobSpec(spec.getJob(), spec.getTaskManager(), effectiveConfig), this.validateJmSpec(spec.getJobManager(), effectiveConfig), this.validateTmSpec(spec.getTaskManager(), effectiveConfig), this.validateSpecChange(deployment, effectiveConfig), this.validateServiceAccount(spec.getServiceAccount()), DefaultValidator.validateAutoScalerFlinkConfiguration(effectiveConfig));
    }

    @SafeVarargs
    private static Optional<String> firstPresent(Optional<String> ... errOpts) {
        for (Optional<String> opt : errOpts) {
            if (!opt.isPresent()) continue;
            return opt;
        }
        return Optional.empty();
    }

    private Optional<String> validateDeploymentName(String name) {
        Matcher matcher = DEPLOYMENT_NAME_PATTERN.matcher(name);
        if (!matcher.matches()) {
            return Optional.of(String.format("The FlinkDeployment name: %s is invalid, must consist of lower case alphanumeric characters or '-', start with an alphabetic character, and end with an alphanumeric character (e.g. 'my-name',  or 'abc-123'), and the length must be no more than 45 characters.", name));
        }
        return Optional.empty();
    }

    private Optional<String> validateFlinkVersion(FlinkDeployment deployment) {
        JobSpec lastJob;
        FlinkDeploymentSpec spec = (FlinkDeploymentSpec)deployment.getSpec();
        if (spec.getFlinkVersion() == null) {
            return Optional.of("Flink Version must be defined.");
        }
        FlinkDeploymentSpec lastReconciledSpec = (FlinkDeploymentSpec)((FlinkDeploymentStatus)deployment.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec();
        if (lastReconciledSpec != null && lastReconciledSpec.getJob() != null && spec.getJob() != null && spec.getJob().getUpgradeMode() != UpgradeMode.STATELESS && (lastJob = lastReconciledSpec.getJob()).getState() == JobState.SUSPENDED && lastJob.getUpgradeMode() == UpgradeMode.LAST_STATE && lastReconciledSpec.getFlinkVersion() != spec.getFlinkVersion()) {
            return Optional.of("Changing flinkVersion after last-state suspend is not allowed. Restore your cluster with the current flinkVersion and perform the version upgrade afterwards.");
        }
        return Optional.empty();
    }

    private Optional<String> validateIngress(IngressSpec ingress, String name, String namespace) {
        if (ingress == null) {
            return Optional.empty();
        }
        if (ingress.getTemplate() == null) {
            return Optional.of("Ingress template must be defined");
        }
        try {
            IngressUtils.getIngressUrl(ingress.getTemplate(), name, namespace);
        }
        catch (ReconciliationException e) {
            return Optional.of(e.getMessage());
        }
        return Optional.empty();
    }

    private Optional<String> validateFlinkDeploymentConfig(Map<String, String> confMap) {
        if (confMap == null) {
            return Optional.empty();
        }
        Configuration conf = Configuration.fromMap(confMap);
        for (String key : FORBIDDEN_CONF_KEYS) {
            if (!conf.containsKey(key)) continue;
            return Optional.of("Forbidden Flink config key: " + key);
        }
        if (((Boolean)conf.get(KubernetesOperatorConfigOptions.DEPLOYMENT_ROLLBACK_ENABLED)).booleanValue() && !HighAvailabilityMode.isHighAvailabilityModeActivated((Configuration)conf)) {
            return Optional.of("HA must be enabled for rollback support.");
        }
        if (((Boolean)conf.get(KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED)).booleanValue() && !((Boolean)conf.get(KubernetesOperatorConfigOptions.OPERATOR_JM_DEPLOYMENT_RECOVERY_ENABLED)).booleanValue()) {
            return Optional.of("Deployment recovery (" + KubernetesOperatorConfigOptions.OPERATOR_JM_DEPLOYMENT_RECOVERY_ENABLED.key() + ") must be enabled for job health check (" + KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED.key() + ") support.");
        }
        return Optional.empty();
    }

    private Optional<String> validateLogConfig(Map<String, String> confMap) {
        if (confMap == null) {
            return Optional.empty();
        }
        for (String key : confMap.keySet()) {
            if (ALLOWED_LOG_CONF_KEYS.contains(key)) continue;
            return Optional.of(String.format("Invalid log config key: %s. Allowed keys are %s", key, ALLOWED_LOG_CONF_KEYS));
        }
        return Optional.empty();
    }

    private Optional<String> validateJobSpec(JobSpec job, @Nullable TaskManagerSpec tm, Map<String, String> confMap) {
        boolean tmReplicasDefined;
        if (job == null) {
            return Optional.empty();
        }
        Configuration configuration = Configuration.fromMap(confMap);
        if (job.getUpgradeMode() == UpgradeMode.LAST_STATE && !HighAvailabilityMode.isHighAvailabilityModeActivated((Configuration)configuration)) {
            return Optional.of("Job could not be upgraded with last-state while HA disabled");
        }
        if (job.getUpgradeMode() != UpgradeMode.STATELESS && StringUtils.isNullOrWhitespaceOnly((String)configuration.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY))) {
            return Optional.of(String.format("Checkpoint directory[%s] must be defined for last-state and savepoint upgrade modes", CheckpointingOptions.CHECKPOINTS_DIRECTORY.key()));
        }
        if (StringUtils.isNullOrWhitespaceOnly((String)configuration.getString(CheckpointingOptions.SAVEPOINT_DIRECTORY))) {
            if (job.getUpgradeMode() == UpgradeMode.SAVEPOINT) {
                return Optional.of(String.format("Job could not be upgraded with savepoint while config key[%s] is not set", CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
            }
            if (job.getSavepointTriggerNonce() != null) {
                return Optional.of(String.format("Savepoint could not be manually triggered for the running job while config key[%s] is not set", CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
            }
            if (configuration.contains(KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL)) {
                return Optional.of(String.format("Periodic savepoints cannot be enabled when config key[%s] is not set", CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
            }
            if (configuration.get(KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_CHECKPOINT_MAX_AGE) != null) {
                return Optional.of(String.format("In order to use max-checkpoint age functionality config key[%s] must be set to allow triggering savepoint upgrades.", CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
            }
        }
        boolean bl = tmReplicasDefined = tm != null && tm.getReplicas() != null;
        if (tmReplicasDefined && tm.getReplicas() < 1) {
            return Optional.of("TaskManager replicas must be larger than 0");
        }
        if (!tmReplicasDefined && job.getParallelism() < 1) {
            return Optional.of("Job parallelism must be larger than 0");
        }
        return Optional.empty();
    }

    private Optional<String> validateJmSpec(JobManagerSpec jmSpec, Map<String, String> confMap) {
        Optional<String> jmMemoryValidation;
        Configuration conf = Configuration.fromMap(confMap);
        boolean jmMemoryDefined = jmSpec != null && jmSpec.getResource() != null && !StringUtils.isNullOrWhitespaceOnly((String)jmSpec.getResource().getMemory());
        Optional<String> optional = jmMemoryValidation = jmMemoryDefined ? Optional.empty() : this.validateJmMemoryConfig(conf);
        if (jmSpec == null) {
            return jmMemoryValidation;
        }
        return DefaultValidator.firstPresent(jmMemoryValidation, this.validateResources("JobManager", jmSpec.getResource()), this.validateJmReplicas(jmSpec.getReplicas(), confMap));
    }

    private Optional<String> validateJmMemoryConfig(Configuration conf) {
        try {
            JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap((Configuration)conf, (ConfigOption)JobManagerOptions.JVM_HEAP_MEMORY);
        }
        catch (Exception e) {
            return Optional.of("JobManager resource memory must be defined using `spec.jobManager.resource.memory`");
        }
        return Optional.empty();
    }

    private Optional<String> validateJmReplicas(int replicas, Map<String, String> confMap) {
        if (replicas < 1) {
            return Optional.of("JobManager replicas should not be configured less than one.");
        }
        if (replicas > 1 && !HighAvailabilityMode.isHighAvailabilityModeActivated((Configuration)Configuration.fromMap(confMap))) {
            return Optional.of("High availability should be enabled when starting standby JobManagers.");
        }
        return Optional.empty();
    }

    private Optional<String> validateTmSpec(TaskManagerSpec tmSpec, Map<String, String> confMap) {
        Optional<String> tmMemoryConfigValidation;
        Configuration conf = Configuration.fromMap(confMap);
        boolean tmMemoryDefined = tmSpec != null && tmSpec.getResource() != null && !StringUtils.isNullOrWhitespaceOnly((String)tmSpec.getResource().getMemory());
        Optional<String> optional = tmMemoryConfigValidation = tmMemoryDefined ? Optional.empty() : this.validateTmMemoryConfig(conf);
        if (tmSpec == null) {
            return tmMemoryConfigValidation;
        }
        return DefaultValidator.firstPresent(tmMemoryConfigValidation, this.validateResources("TaskManager", tmSpec.getResource()), this.validateTmReplicas(tmSpec));
    }

    private Optional<String> validateTmMemoryConfig(Configuration conf) {
        try {
            TaskExecutorProcessUtils.processSpecFromConfig((Configuration)conf);
        }
        catch (Exception e) {
            return Optional.of("TaskManager resource memory must be defined using `spec.taskManager.resource.memory`");
        }
        return Optional.empty();
    }

    private Optional<String> validateTmReplicas(TaskManagerSpec tmSpec) {
        if (tmSpec.getReplicas() != null && tmSpec.getReplicas() < 1) {
            return Optional.of("TaskManager replicas should not be configured less than one.");
        }
        return Optional.empty();
    }

    private Optional<String> validateResources(String component, Resource resource) {
        String errorMessage;
        if (resource == null) {
            return Optional.empty();
        }
        String memory = resource.getMemory();
        String storage = resource.getEphemeralStorage();
        StringBuilder builder = new StringBuilder();
        if (memory != null) {
            try {
                MemorySize.parse((String)FlinkConfigBuilder.parseResourceMemoryString(resource.getMemory()));
            }
            catch (IllegalArgumentException iae) {
                builder.append(component + " resource memory parse error: " + iae.getMessage());
            }
        }
        if (storage != null) {
            try {
                Quantity quantity = Quantity.parse((String)storage);
                Quantity.getAmountInBytes((Quantity)quantity);
            }
            catch (IllegalArgumentException iae) {
                builder.append(component + " resource ephemeral storage parse error: " + iae.getMessage());
            }
        }
        if (!StringUtils.isNullOrWhitespaceOnly((String)(errorMessage = builder.toString()))) {
            return Optional.of(errorMessage);
        }
        return Optional.empty();
    }

    private Optional<String> validateSpecChange(FlinkDeployment deployment, Map<String, String> effectiveConfig) {
        KubernetesDeploymentMode newDeploymentMode;
        FlinkDeploymentSpec newSpec = (FlinkDeploymentSpec)deployment.getSpec();
        if (((FlinkDeploymentStatus)deployment.getStatus()).getReconciliationStatus().isBeforeFirstDeployment()) {
            return Optional.empty();
        }
        FlinkDeploymentSpec oldSpec = (FlinkDeploymentSpec)((FlinkDeploymentStatus)deployment.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec();
        if (newSpec.getJob() != null && oldSpec.getJob() == null) {
            return Optional.of("Cannot switch from session to job cluster");
        }
        if (newSpec.getJob() == null && oldSpec.getJob() != null) {
            return Optional.of("Cannot switch from job to session cluster");
        }
        KubernetesDeploymentMode oldDeploymentMode = oldSpec.getMode() == null ? KubernetesDeploymentMode.NATIVE : oldSpec.getMode();
        KubernetesDeploymentMode kubernetesDeploymentMode = newDeploymentMode = newSpec.getMode() == null ? KubernetesDeploymentMode.NATIVE : newSpec.getMode();
        if (oldDeploymentMode == KubernetesDeploymentMode.NATIVE && newDeploymentMode != KubernetesDeploymentMode.NATIVE) {
            return Optional.of("Cannot switch from native kubernetes to standalone kubernetes cluster");
        }
        if (oldDeploymentMode == KubernetesDeploymentMode.STANDALONE && newDeploymentMode != KubernetesDeploymentMode.STANDALONE) {
            return Optional.of("Cannot switch from standalone kubernetes to native kubernetes cluster");
        }
        JobSpec oldJob = oldSpec.getJob();
        JobSpec newJob = newSpec.getJob();
        if (oldJob != null && newJob != null && StringUtils.isNullOrWhitespaceOnly((String)effectiveConfig.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key())) && ((FlinkDeploymentStatus)deployment.getStatus()).getJobManagerDeploymentStatus() != JobManagerDeploymentStatus.MISSING && ReconciliationUtils.isUpgradeModeChangedToLastStateAndHADisabledPreviously(deployment, this.configManager.getObserveConfig(deployment))) {
            return Optional.of(String.format("Job could not be upgraded to last-state while config key[%s] is not set", CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
        }
        return Optional.empty();
    }

    @Override
    public Optional<String> validateSessionJob(FlinkSessionJob sessionJob, Optional<FlinkDeployment> sessionOpt) {
        if (sessionOpt.isEmpty()) {
            return this.validateSessionJobOnly(sessionJob);
        }
        return DefaultValidator.firstPresent(this.validateSessionJobOnly(sessionJob), this.validateSessionJobWithCluster(sessionJob, sessionOpt.get()));
    }

    private Optional<String> validateSessionJobOnly(FlinkSessionJob sessionJob) {
        return DefaultValidator.firstPresent(this.validateDeploymentName(((FlinkSessionJobSpec)sessionJob.getSpec()).getDeploymentName()), this.validateJobNotEmpty(sessionJob), this.validateNotLastStateUpgradeMode(sessionJob), this.validateSpecChange(sessionJob));
    }

    private Optional<String> validateSessionJobWithCluster(FlinkSessionJob sessionJob, FlinkDeployment sessionCluster) {
        Map effectiveConfig = this.configManager.getDefaultConfig(sessionJob.getMetadata().getNamespace(), ((FlinkDeploymentSpec)sessionCluster.getSpec()).getFlinkVersion()).toMap();
        if (((FlinkDeploymentSpec)sessionCluster.getSpec()).getFlinkConfiguration() != null) {
            effectiveConfig.putAll(((FlinkDeploymentSpec)sessionCluster.getSpec()).getFlinkConfiguration());
        }
        return DefaultValidator.firstPresent(this.validateNotApplicationCluster(sessionCluster), this.validateSessionClusterId(sessionJob, sessionCluster), this.validateJobSpec(((FlinkSessionJobSpec)sessionJob.getSpec()).getJob(), null, effectiveConfig), DefaultValidator.validateAutoScalerFlinkConfiguration(effectiveConfig));
    }

    private Optional<String> validateJobNotEmpty(FlinkSessionJob sessionJob) {
        if (((FlinkSessionJobSpec)sessionJob.getSpec()).getJob() == null) {
            return Optional.of("The job spec should not be empty");
        }
        return Optional.empty();
    }

    private Optional<String> validateNotApplicationCluster(FlinkDeployment session) {
        if (((FlinkDeploymentSpec)session.getSpec()).getJob() != null) {
            return Optional.of("Can not submit session job to application cluster");
        }
        return Optional.empty();
    }

    private Optional<String> validateSessionClusterId(FlinkSessionJob sessionJob, FlinkDeployment session) {
        String deploymentName = session.getMetadata().getName();
        if (!deploymentName.equals(((FlinkSessionJobSpec)sessionJob.getSpec()).getDeploymentName())) {
            return Optional.of("The session job's cluster id is not match with the session cluster");
        }
        return Optional.empty();
    }

    private Optional<String> validateNotLastStateUpgradeMode(FlinkSessionJob sessionJob) {
        if (((FlinkSessionJobSpec)sessionJob.getSpec()).getJob().getUpgradeMode() == UpgradeMode.LAST_STATE) {
            return Optional.of(String.format("The %s upgrade mode is not supported in session job now.", UpgradeMode.LAST_STATE));
        }
        return Optional.empty();
    }

    private Optional<String> validateSpecChange(FlinkSessionJob sessionJob) {
        FlinkSessionJobSpec newSpec = (FlinkSessionJobSpec)sessionJob.getSpec();
        if (((FlinkSessionJobStatus)sessionJob.getStatus()).getReconciliationStatus().isBeforeFirstDeployment()) {
            return Optional.empty();
        }
        FlinkSessionJobSpec lastReconciledSpec = (FlinkSessionJobSpec)((FlinkSessionJobStatus)sessionJob.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec();
        if (!lastReconciledSpec.getDeploymentName().equals(((FlinkSessionJobSpec)sessionJob.getSpec()).getDeploymentName())) {
            return Optional.of("The deploymentName can't be changed");
        }
        return Optional.empty();
    }

    private Optional<String> validateServiceAccount(String serviceAccount) {
        if (serviceAccount == null) {
            return Optional.of("spec.serviceAccount must be defined. If you use helm, its value should be the same with the name of jobServiceAccount.");
        }
        return Optional.empty();
    }

    public static Optional<String> validateAutoScalerFlinkConfiguration(Map<String, String> effectiveConfig) {
        if (effectiveConfig == null) {
            return Optional.empty();
        }
        Configuration flinkConfiguration = Configuration.fromMap(effectiveConfig);
        if (!flinkConfiguration.getBoolean(AutoScalerOptions.AUTOSCALER_ENABLED)) {
            return Optional.empty();
        }
        return DefaultValidator.firstPresent(DefaultValidator.validateNumber(flinkConfiguration, AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.0), DefaultValidator.validateNumber(flinkConfiguration, AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.0), DefaultValidator.validateNumber(flinkConfiguration, AutoScalerOptions.TARGET_UTILIZATION, 0.0), DefaultValidator.validateNumber(flinkConfiguration, AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0));
    }

    private static <T extends Number> Optional<String> validateNumber(Configuration flinkConfiguration, ConfigOption<T> autoScalerConfig, Double min, Double max) {
        try {
            Number configValue = (Number)flinkConfiguration.get(autoScalerConfig);
            if (configValue != null) {
                double value = configValue.doubleValue();
                if (min != null && value < min || max != null && value > max) {
                    return Optional.of(String.format("The AutoScalerOption %s is invalid, it should be a value within the range [%s, %s]", autoScalerConfig.key(), min != null ? min.toString() : "-Infinity", max != null ? max.toString() : "+Infinity"));
                }
            }
            return Optional.empty();
        }
        catch (IllegalArgumentException e) {
            return Optional.of(String.format("Invalid value in the autoscaler config %s", autoScalerConfig.key()));
        }
    }

    private static <T extends Number> Optional<String> validateNumber(Configuration flinkConfiguration, ConfigOption<T> autoScalerConfig, Double min) {
        return DefaultValidator.validateNumber(flinkConfiguration, autoScalerConfig, min, null);
    }
}

