/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.service;

import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
import org.apache.flink.kubernetes.operator.artifact.ArtifactManager;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentContext;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.controller.FlinkSessionJobContext;
import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
import org.apache.flink.kubernetes.operator.metrics.KubernetesResourceMetricGroup;
import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.service.NativeFlinkService;
import org.apache.flink.kubernetes.operator.service.StandaloneFlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkResourceContextFactory {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkResourceContextFactory.class);
    private final FlinkConfigManager configManager;
    private final ArtifactManager artifactManager;
    private final ExecutorService clientExecutorService;
    private final KubernetesOperatorMetricGroup operatorMetricGroup;
    private final EventRecorder eventRecorder;
    protected final Map<Tuple2<Class<?>, ResourceID>, KubernetesResourceMetricGroup> resourceMetricGroups = new ConcurrentHashMap();

    public FlinkResourceContextFactory(FlinkConfigManager configManager, KubernetesOperatorMetricGroup operatorMetricGroup, EventRecorder eventRecorder) {
        this.configManager = configManager;
        this.operatorMetricGroup = operatorMetricGroup;
        this.eventRecorder = eventRecorder;
        this.artifactManager = new ArtifactManager(configManager);
        this.clientExecutorService = Executors.newFixedThreadPool(configManager.getOperatorConfiguration().getReconcilerMaxParallelism(), (ThreadFactory)new ExecutorThreadFactory("Flink-RestClusterClient-IO"));
    }

    public <CR extends AbstractFlinkResource<?, ?>> FlinkResourceContext<CR> getResourceContext(CR resource, Context josdkContext) {
        KubernetesResourceMetricGroup resMg = this.resourceMetricGroups.computeIfAbsent(Tuple2.of(resource.getClass(), (Object)ResourceID.fromResource(resource)), r -> OperatorMetricUtils.createResourceMetricGroup(this.operatorMetricGroup, this.configManager, resource));
        if (resource instanceof FlinkDeployment) {
            FlinkDeployment flinkDep = (FlinkDeployment)resource;
            return new FlinkDeploymentContext(flinkDep, josdkContext, resMg, this.configManager, this::getFlinkService);
        }
        if (resource instanceof FlinkSessionJob) {
            return new FlinkSessionJobContext((FlinkSessionJob)resource, josdkContext, resMg, this.configManager, this::getFlinkService);
        }
        throw new IllegalArgumentException("Unknown resource type " + resource.getClass().getSimpleName());
    }

    @VisibleForTesting
    protected FlinkService getFlinkService(FlinkResourceContext<?> ctx) {
        KubernetesDeploymentMode deploymentMode = ctx.getDeploymentMode();
        switch (deploymentMode) {
            case NATIVE: {
                return new NativeFlinkService(ctx.getKubernetesClient(), this.artifactManager, this.clientExecutorService, ctx.getOperatorConfig(), this.eventRecorder);
            }
            case STANDALONE: {
                return new StandaloneFlinkService(ctx.getKubernetesClient(), this.artifactManager, this.clientExecutorService, ctx.getOperatorConfig());
            }
        }
        throw new UnsupportedOperationException(String.format("Unsupported deployment mode: %s", deploymentMode));
    }

    public <CR extends AbstractFlinkResource<?, ?>> void cleanup(CR flinkApp) {
        KubernetesResourceMetricGroup resourceMetricGroup = this.resourceMetricGroups.remove(Tuple2.of(flinkApp.getClass(), (Object)ResourceID.fromResource(flinkApp)));
        if (resourceMetricGroup != null) {
            resourceMetricGroup.close();
        } else {
            LOG.warn("Unknown resource metric group for {}", flinkApp);
        }
    }
}

