/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EnvUtils;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.guava30.com.google.common.cache.CacheLoader;
import org.apache.flink.shaded.guava30.com.google.common.cache.LoadingCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkConfigManager {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkConfigManager.class);
    private static final ObjectMapper objectMapper = new ObjectMapper();
    private volatile Configuration defaultConfig;
    private volatile FlinkOperatorConfiguration operatorConfiguration;
    private final AtomicLong defaultConfigVersion = new AtomicLong(0L);
    private final LoadingCache<Key, Configuration> cache;
    private final Consumer<Set<String>> namespaceListener;

    @VisibleForTesting
    public FlinkConfigManager(Configuration defaultConfig) {
        this(defaultConfig, ns -> {});
    }

    public FlinkConfigManager(Consumer<Set<String>> namespaceListener) {
        this(FlinkConfigManager.loadGlobalConfiguration(), namespaceListener);
    }

    public FlinkConfigManager(Configuration defaultConfig, Consumer<Set<String>> namespaceListener) {
        this.namespaceListener = namespaceListener;
        Duration cacheTimeout = (Duration)defaultConfig.get(KubernetesOperatorConfigOptions.OPERATOR_CONFIG_CACHE_TIMEOUT);
        this.cache = CacheBuilder.newBuilder().maximumSize((long)((Integer)defaultConfig.get(KubernetesOperatorConfigOptions.OPERATOR_CONFIG_CACHE_SIZE)).intValue()).expireAfterAccess(cacheTimeout).removalListener(removalNotification -> FlinkConfigBuilder.cleanupTmpFiles((Configuration)removalNotification.getValue())).build((CacheLoader)new CacheLoader<Key, Configuration>(){

            public Configuration load(Key k) {
                return FlinkConfigManager.this.generateConfig(k);
            }
        });
        this.updateDefaultConfig(defaultConfig);
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        executorService.scheduleWithFixedDelay(() -> this.cache.cleanUp(), cacheTimeout.toMillis(), cacheTimeout.toMillis(), TimeUnit.MILLISECONDS);
        if (defaultConfig.getBoolean(KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_ENABLED)) {
            this.scheduleConfigWatcher(executorService);
        }
    }

    public Configuration getDefaultConfig() {
        return this.defaultConfig.clone();
    }

    @VisibleForTesting
    public void updateDefaultConfig(Configuration newConf) {
        if (ObjectUtils.allNotNull((Object[])new Object[]{this.defaultConfig, newConf}) && this.defaultConfig.toMap().equals(newConf.toMap())) {
            LOG.info("Default configuration did not change, nothing to do...");
            return;
        }
        LOG.info("Setting default configuration to {}", (Object)newConf);
        Set oldNs = Optional.ofNullable(this.operatorConfiguration).map(FlinkOperatorConfiguration::getWatchedNamespaces).orElse(Set.of());
        this.operatorConfiguration = FlinkOperatorConfiguration.fromConfiguration(newConf);
        Set<String> newNs = this.operatorConfiguration.getWatchedNamespaces();
        if (this.operatorConfiguration.isDynamicNamespacesEnabled() && !oldNs.equals(newNs)) {
            this.namespaceListener.accept(this.operatorConfiguration.getWatchedNamespaces());
        }
        this.defaultConfig = newConf.clone();
        this.defaultConfigVersion.incrementAndGet();
    }

    public FlinkOperatorConfiguration getOperatorConfiguration() {
        return this.operatorConfiguration;
    }

    public Configuration getDeployConfig(ObjectMeta objectMeta, FlinkDeploymentSpec spec) {
        Configuration conf = this.getConfig(objectMeta, spec);
        FlinkUtils.setGenerationAnnotation(conf, objectMeta.getGeneration());
        return conf;
    }

    public Configuration getObserveConfig(FlinkDeployment deployment) {
        FlinkDeploymentSpec deployedSpec = (FlinkDeploymentSpec)ReconciliationUtils.getDeployedSpec(deployment);
        if (deployedSpec == null) {
            throw new RuntimeException("Cannot create observe config before first deployment, this indicates a bug.");
        }
        Configuration conf = this.getConfig(deployment.getMetadata(), deployedSpec);
        this.applyConfigsFromCurrentSpec((AbstractFlinkSpec)deployment.getSpec(), conf, CheckpointingOptions.SAVEPOINT_DIRECTORY);
        return conf;
    }

    private void addOperatorConfigsFromSpec(AbstractFlinkSpec spec, Configuration conf) {
        if (spec.getFlinkConfiguration() != null) {
            spec.getFlinkConfiguration().forEach((k, v) -> {
                if (k.startsWith("kubernetes.operator.")) {
                    conf.setString(k, v);
                }
            });
        }
    }

    private void applyConfigsFromCurrentSpec(AbstractFlinkSpec spec, Configuration conf, ConfigOption ... configOptions) {
        this.addOperatorConfigsFromSpec(spec, conf);
        if (spec.getFlinkConfiguration() != null) {
            Configuration deployConfig = Configuration.fromMap((Map)spec.getFlinkConfiguration());
            for (ConfigOption configOption : configOptions) {
                deployConfig.getOptional(configOption).ifPresent(v -> conf.set(configOption, v));
            }
        }
    }

    public Configuration getSessionJobConfig(FlinkDeployment deployment, FlinkSessionJobSpec sessionJobSpec) {
        Configuration sessionJobConfig = this.getObserveConfig(deployment);
        Map sessionJobFlinkConfiguration = sessionJobSpec.getFlinkConfiguration();
        if (sessionJobFlinkConfiguration != null) {
            sessionJobFlinkConfiguration.forEach((arg_0, arg_1) -> ((Configuration)sessionJobConfig).setString(arg_0, arg_1));
        }
        return sessionJobConfig;
    }

    private Configuration getConfig(ObjectMeta objectMeta, FlinkDeploymentSpec spec) {
        Key key = Key.builder().configVersion(this.defaultConfigVersion.get()).name(objectMeta.getName()).namespace(objectMeta.getNamespace()).spec((ObjectNode)objectMapper.convertValue((Object)spec, ObjectNode.class)).build();
        return ((Configuration)this.cache.get((Object)key)).clone();
    }

    private Configuration generateConfig(Key key) {
        try {
            LOG.info("Generating new config");
            return FlinkConfigBuilder.buildFrom(key.namespace, key.name, (FlinkDeploymentSpec)objectMapper.convertValue((Object)key.spec, FlinkDeploymentSpec.class), this.defaultConfig);
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to load configuration", e);
        }
    }

    private void scheduleConfigWatcher(ScheduledExecutorService executorService) {
        Duration checkInterval = (Duration)this.defaultConfig.get(KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL);
        long millis = checkInterval.toMillis();
        executorService.scheduleAtFixedRate(new ConfigUpdater(), millis, millis, TimeUnit.MILLISECONDS);
        LOG.info("Enabled dynamic config updates, checking config changes every {}", (Object)checkInterval);
    }

    @VisibleForTesting
    protected Cache<Key, Configuration> getCache() {
        return this.cache;
    }

    private static Configuration loadGlobalConfiguration() {
        return FlinkConfigManager.loadGlobalConfiguration(EnvUtils.get("CONF_OVERRIDE_DIR"));
    }

    @VisibleForTesting
    protected static Configuration loadGlobalConfiguration(Optional<String> confOverrideDir) {
        if (confOverrideDir.isPresent()) {
            Configuration configOverrides = GlobalConfiguration.loadConfiguration((String)confOverrideDir.get());
            LOG.debug("Loading default configuration with overrides from " + confOverrideDir.get());
            return GlobalConfiguration.loadConfiguration((Configuration)configOverrides);
        }
        LOG.debug("Loading default configuration");
        return GlobalConfiguration.loadConfiguration();
    }

    private static final class Key {
        private final long configVersion;
        private final String namespace;
        private final String name;
        private final ObjectNode spec;

        Key(long configVersion, String namespace, String name, ObjectNode spec) {
            this.configVersion = configVersion;
            this.namespace = namespace;
            this.name = name;
            this.spec = spec;
        }

        public static KeyBuilder builder() {
            return new KeyBuilder();
        }

        public long getConfigVersion() {
            return this.configVersion;
        }

        public String getNamespace() {
            return this.namespace;
        }

        public String getName() {
            return this.name;
        }

        public ObjectNode getSpec() {
            return this.spec;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof Key)) {
                return false;
            }
            Key other = (Key)o;
            if (this.getConfigVersion() != other.getConfigVersion()) {
                return false;
            }
            String this$namespace = this.getNamespace();
            String other$namespace = other.getNamespace();
            if (this$namespace == null ? other$namespace != null : !this$namespace.equals(other$namespace)) {
                return false;
            }
            String this$name = this.getName();
            String other$name = other.getName();
            if (this$name == null ? other$name != null : !this$name.equals(other$name)) {
                return false;
            }
            ObjectNode this$spec = this.getSpec();
            ObjectNode other$spec = other.getSpec();
            return !(this$spec == null ? other$spec != null : !this$spec.equals(other$spec));
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            long $configVersion = this.getConfigVersion();
            result = result * 59 + (int)($configVersion >>> 32 ^ $configVersion);
            String $namespace = this.getNamespace();
            result = result * 59 + ($namespace == null ? 43 : $namespace.hashCode());
            String $name = this.getName();
            result = result * 59 + ($name == null ? 43 : $name.hashCode());
            ObjectNode $spec = this.getSpec();
            result = result * 59 + ($spec == null ? 43 : $spec.hashCode());
            return result;
        }

        public String toString() {
            return "FlinkConfigManager.Key(configVersion=" + this.getConfigVersion() + ", namespace=" + this.getNamespace() + ", name=" + this.getName() + ", spec=" + this.getSpec() + ")";
        }

        public static class KeyBuilder {
            private long configVersion;
            private String namespace;
            private String name;
            private ObjectNode spec;

            KeyBuilder() {
            }

            public KeyBuilder configVersion(long configVersion) {
                this.configVersion = configVersion;
                return this;
            }

            public KeyBuilder namespace(String namespace) {
                this.namespace = namespace;
                return this;
            }

            public KeyBuilder name(String name) {
                this.name = name;
                return this;
            }

            public KeyBuilder spec(ObjectNode spec) {
                this.spec = spec;
                return this;
            }

            public Key build() {
                return new Key(this.configVersion, this.namespace, this.name, this.spec);
            }

            public String toString() {
                return "FlinkConfigManager.Key.KeyBuilder(configVersion=" + this.configVersion + ", namespace=" + this.namespace + ", name=" + this.name + ", spec=" + this.spec + ")";
            }
        }
    }

    private class ConfigUpdater
    implements Runnable {
        private ConfigUpdater() {
        }

        @Override
        public void run() {
            try {
                LOG.debug("Checking for config update changes...");
                FlinkConfigManager.this.updateDefaultConfig(FlinkConfigManager.loadGlobalConfiguration());
            }
            catch (Exception e) {
                LOG.error("Error while updating operator configuration", (Throwable)e);
            }
        }
    }
}

