/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.Operator;
import io.javaoperatorsdk.operator.RegisteredController;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider;
import io.javaoperatorsdk.operator.api.config.ControllerConfigurationOverrider;
import io.javaoperatorsdk.operator.api.config.LeaderElectionConfiguration;
import io.javaoperatorsdk.operator.api.monitoring.Metrics;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.retry.Retry;
import java.time.Duration;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.autoscaler.JobAutoScaler;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.listener.FlinkResourceListener;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFactory;
import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
import org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController;
import org.apache.flink.kubernetes.operator.health.CanaryResourceManager;
import org.apache.flink.kubernetes.operator.health.HealthProbe;
import org.apache.flink.kubernetes.operator.health.OperatorHealthService;
import org.apache.flink.kubernetes.operator.listener.ListenerUtils;
import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics;
import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
import org.apache.flink.kubernetes.operator.observer.deployment.FlinkDeploymentObserverFactory;
import org.apache.flink.kubernetes.operator.observer.sessionjob.FlinkSessionJobObserver;
import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
import org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler;
import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
import org.apache.flink.kubernetes.operator.utils.EnvUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
import org.apache.flink.metrics.MetricGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkOperator {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkOperator.class);
    private final Operator operator;
    private final KubernetesClient client;
    private final FlinkResourceContextFactory ctxFactory;
    private final FlinkConfigManager configManager;
    private final Set<FlinkResourceValidator> validators;
    @VisibleForTesting
    final Set<RegisteredController<?>> registeredControllers = new HashSet();
    private final KubernetesOperatorMetricGroup metricGroup;
    private final Collection<FlinkResourceListener> listeners;
    private final OperatorHealthService operatorHealthService;
    private final EventRecorder eventRecorder;
    private final Configuration baseConfig;

    public FlinkOperator(@Nullable Configuration conf) {
        this.configManager = conf != null ? new FlinkConfigManager(conf) : new FlinkConfigManager(this::handleNamespaceChanges);
        this.baseConfig = this.configManager.getDefaultConfig();
        this.metricGroup = OperatorMetricUtils.initOperatorMetrics(this.baseConfig);
        this.client = KubernetesClientUtils.getKubernetesClient(this.configManager.getOperatorConfiguration(), (MetricGroup)this.metricGroup);
        this.operator = this.createOperator();
        this.validators = ValidatorUtils.discoverValidators(this.configManager);
        this.listeners = ListenerUtils.discoverListeners(this.configManager);
        this.eventRecorder = EventRecorder.create(this.client, this.listeners);
        this.ctxFactory = new FlinkResourceContextFactory(this.configManager, this.metricGroup, this.eventRecorder);
        PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder((Configuration)this.baseConfig);
        FileSystem.initialize((Configuration)this.baseConfig, (PluginManager)pluginManager);
        this.operatorHealthService = OperatorHealthService.fromConfig(this.configManager);
    }

    @VisibleForTesting
    protected Operator createOperator() {
        return new Operator(this::overrideOperatorConfigs);
    }

    @VisibleForTesting
    protected Operator getOperator() {
        return this.operator;
    }

    private void handleNamespaceChanges(Set<String> namespaces) {
        this.registeredControllers.forEach(controller -> {
            if (controller.allowsNamespaceChanges()) {
                LOG.info("Changing namespaces on {} to {}", controller, (Object)namespaces);
                controller.changeNamespaces(namespaces);
            }
        });
    }

    private void overrideOperatorConfigs(ConfigurationServiceOverrider overrider) {
        overrider.withKubernetesClient(this.client);
        Configuration conf = this.configManager.getDefaultConfig();
        FlinkOperatorConfiguration operatorConf = FlinkOperatorConfiguration.fromConfiguration(conf);
        int parallelism = operatorConf.getReconcilerMaxParallelism();
        if (parallelism == -1) {
            LOG.info("Configuring operator with unbounded reconciliation thread pool.");
            overrider.withExecutorService(Executors.newCachedThreadPool());
        } else {
            LOG.info("Configuring operator with {} reconciliation threads.", (Object)parallelism);
            overrider.withConcurrentReconciliationThreads(parallelism);
        }
        if (operatorConf.isJosdkMetricsEnabled()) {
            overrider.withMetrics((Metrics)new OperatorJosdkMetrics(this.metricGroup, this.configManager));
        }
        overrider.withTerminationTimeoutSeconds((int)((Duration)conf.get(KubernetesOperatorConfigOptions.OPERATOR_TERMINATION_TIMEOUT)).toSeconds());
        overrider.withStopOnInformerErrorDuringStartup(((Boolean)conf.get(KubernetesOperatorConfigOptions.OPERATOR_STOP_ON_INFORMER_ERROR)).booleanValue());
        LeaderElectionConfiguration leaderElectionConf = operatorConf.getLeaderElectionConfiguration();
        if (leaderElectionConf != null) {
            overrider.withLeaderElectionConfiguration(leaderElectionConf);
            LOG.info("Operator leader election is enabled.");
        } else {
            LOG.info("Operator leader election is disabled.");
        }
    }

    @VisibleForTesting
    void registerDeploymentController() {
        MetricManager<FlinkDeployment> metricManager = MetricManager.createFlinkDeploymentMetricManager(this.baseConfig, this.metricGroup);
        StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder = StatusRecorder.create(this.client, metricManager, this.listeners);
        JobAutoScaler<ResourceID, KubernetesJobAutoScalerContext> autoscaler = AutoscalerFactory.create(this.client, this.eventRecorder);
        ReconcilerFactory reconcilerFactory = new ReconcilerFactory(this.eventRecorder, statusRecorder, autoscaler);
        FlinkDeploymentObserverFactory observerFactory = new FlinkDeploymentObserverFactory(this.eventRecorder);
        CanaryResourceManager<FlinkDeployment> canaryResourceManager = new CanaryResourceManager<FlinkDeployment>(this.configManager);
        HealthProbe.INSTANCE.registerCanaryResourceManager(canaryResourceManager);
        FlinkDeploymentController controller = new FlinkDeploymentController(this.validators, this.ctxFactory, reconcilerFactory, observerFactory, statusRecorder, this.eventRecorder, canaryResourceManager);
        this.registeredControllers.add(this.operator.register((Reconciler)controller, this::overrideControllerConfigs));
    }

    @VisibleForTesting
    void registerSessionJobController() {
        EventRecorder eventRecorder = EventRecorder.create(this.client, this.listeners);
        MetricManager<FlinkSessionJob> metricManager = MetricManager.createFlinkSessionJobMetricManager(this.baseConfig, this.metricGroup);
        StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus> statusRecorder = StatusRecorder.create(this.client, metricManager, this.listeners);
        SessionJobReconciler reconciler = new SessionJobReconciler(eventRecorder, statusRecorder);
        FlinkSessionJobObserver observer = new FlinkSessionJobObserver(eventRecorder);
        CanaryResourceManager<FlinkSessionJob> canaryResourceManager = new CanaryResourceManager<FlinkSessionJob>(this.configManager);
        HealthProbe.INSTANCE.registerCanaryResourceManager(canaryResourceManager);
        FlinkSessionJobController controller = new FlinkSessionJobController(this.validators, this.ctxFactory, reconciler, observer, statusRecorder, eventRecorder, canaryResourceManager);
        this.registeredControllers.add(this.operator.register((Reconciler)controller, this::overrideControllerConfigs));
    }

    private void overrideControllerConfigs(ControllerConfigurationOverrider<?> overrider) {
        FlinkOperatorConfiguration operatorConf = this.configManager.getOperatorConfiguration();
        Set<String> watchNamespaces = operatorConf.getWatchedNamespaces();
        LOG.info("Configuring operator to watch the following namespaces: {}.", watchNamespaces);
        overrider.settingNamespaces(operatorConf.getWatchedNamespaces());
        overrider.withRetry((Retry)operatorConf.getRetryConfiguration());
        overrider.withRateLimiter(operatorConf.getRateLimiter());
        String labelSelector = operatorConf.getLabelSelector();
        LOG.info("Configuring operator to select custom resources with the {} labels.", (Object)labelSelector);
        overrider.withLabelSelector(labelSelector);
    }

    public void run() {
        this.registerDeploymentController();
        this.registerSessionJobController();
        this.operator.installShutdownHook((Duration)this.baseConfig.get(KubernetesOperatorConfigOptions.OPERATOR_TERMINATION_TIMEOUT));
        this.operator.start();
        if (this.operatorHealthService != null) {
            HealthProbe.INSTANCE.setRuntimeInfo(this.operator.getRuntimeInfo());
            Runtime.getRuntime().addShutdownHook(new Thread(this.operatorHealthService::stop));
            this.operatorHealthService.start();
        }
    }

    public void stop() {
        this.operator.stop();
    }

    public static void main(String ... args) {
        EnvUtils.logEnvironmentInfo(LOG, "Flink Kubernetes Operator", args);
        new FlinkOperator(null).run();
    }
}

