/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.highavailability;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
import org.apache.flink.kubernetes.highavailability.KubernetesCheckpointRecoveryFactory;
import org.apache.flink.kubernetes.highavailability.KubernetesMultipleComponentLeaderElectionDriverFactory;
import org.apache.flink.kubernetes.highavailability.KubernetesMultipleComponentLeaderRetrievalDriverFactory;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.KubernetesConfigMapSharedWatcher;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.blob.BlobStoreService;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.highavailability.AbstractHaServices;
import org.apache.flink.runtime.highavailability.FileSystemJobResultStore;
import org.apache.flink.runtime.highavailability.JobResultStore;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService;
import org.apache.flink.runtime.leaderelection.DefaultMultipleComponentLeaderElectionService;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriverFactory;
import org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalDriverFactory;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

public class KubernetesMultipleComponentLeaderElectionHaServices
extends AbstractHaServices {
    private final Object lock = new Object();
    private final String clusterId;
    private final FlinkKubeClient kubeClient;
    private final KubernetesConfigMapSharedWatcher configMapSharedWatcher;
    private final ExecutorService watchExecutorService;
    private final String lockIdentity;
    private final FatalErrorHandler fatalErrorHandler;
    @Nullable
    @GuardedBy(value="lock")
    private DefaultMultipleComponentLeaderElectionService multipleComponentLeaderElectionService = null;

    KubernetesMultipleComponentLeaderElectionHaServices(FlinkKubeClient kubeClient, Executor executor, Configuration config, BlobStoreService blobStoreService, FatalErrorHandler fatalErrorHandler) throws IOException {
        super(config, executor, blobStoreService, (JobResultStore)FileSystemJobResultStore.fromConfiguration((Configuration)config));
        this.kubeClient = (FlinkKubeClient)Preconditions.checkNotNull((Object)kubeClient);
        this.clusterId = (String)Preconditions.checkNotNull((Object)config.get(KubernetesConfigOptions.CLUSTER_ID));
        this.fatalErrorHandler = (FatalErrorHandler)Preconditions.checkNotNull((Object)fatalErrorHandler);
        this.configMapSharedWatcher = this.kubeClient.createConfigMapSharedWatcher(KubernetesUtils.getConfigMapLabels(this.clusterId, "high-availability"));
        this.watchExecutorService = Executors.newCachedThreadPool((ThreadFactory)new ExecutorThreadFactory("config-map-watch-handler"));
        this.lockIdentity = UUID.randomUUID().toString();
    }

    protected LeaderElectionService createLeaderElectionService(String leaderName) {
        DefaultMultipleComponentLeaderElectionService multipleComponentLeaderElectionService = this.getOrInitializeSingleLeaderElectionService();
        return new DefaultLeaderElectionService(multipleComponentLeaderElectionService.createDriverFactory(leaderName));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private DefaultMultipleComponentLeaderElectionService getOrInitializeSingleLeaderElectionService() {
        Object object = this.lock;
        synchronized (object) {
            if (this.multipleComponentLeaderElectionService == null) {
                try {
                    KubernetesLeaderElectionConfiguration leaderElectionConfiguration = new KubernetesLeaderElectionConfiguration(this.getClusterConfigMap(), this.lockIdentity, this.configuration);
                    this.multipleComponentLeaderElectionService = new DefaultMultipleComponentLeaderElectionService(this.fatalErrorHandler, (MultipleComponentLeaderElectionDriverFactory)new KubernetesMultipleComponentLeaderElectionDriverFactory(this.kubeClient, leaderElectionConfiguration, this.configMapSharedWatcher, this.watchExecutorService, this.fatalErrorHandler));
                }
                catch (Exception e) {
                    throw new FlinkRuntimeException("Could not initialize the default single leader election service.", (Throwable)e);
                }
            }
            return this.multipleComponentLeaderElectionService;
        }
    }

    protected LeaderRetrievalService createLeaderRetrievalService(String componentId) {
        return new DefaultLeaderRetrievalService((LeaderRetrievalDriverFactory)new KubernetesMultipleComponentLeaderRetrievalDriverFactory(this.kubeClient, this.configMapSharedWatcher, this.watchExecutorService, this.getClusterConfigMap(), componentId));
    }

    protected CheckpointRecoveryFactory createCheckpointRecoveryFactory() {
        return KubernetesCheckpointRecoveryFactory.withoutLeadershipValidation(this.kubeClient, this.configuration, this.ioExecutor, this.clusterId, this::getJobSpecificConfigMap);
    }

    private String getJobSpecificConfigMap(JobID jobID) {
        return this.clusterId + "-" + jobID.toString() + "-" + "config-map";
    }

    protected JobGraphStore createJobGraphStore() throws Exception {
        return KubernetesUtils.createJobGraphStore(this.configuration, this.kubeClient, this.getClusterConfigMap(), this.lockIdentity);
    }

    private String getClusterConfigMap() {
        return this.clusterId + "-" + "cluster-config-map";
    }

    public void internalClose() throws Exception {
        Exception exception = null;
        try {
            this.closeK8sServices();
        }
        catch (Exception e) {
            exception = e;
        }
        this.kubeClient.close();
        ExecutorUtils.gracefulShutdown((long)5L, (TimeUnit)TimeUnit.SECONDS, (ExecutorService[])new ExecutorService[]{this.watchExecutorService});
        ExceptionUtils.tryRethrowException((Exception)exception);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeK8sServices() throws Exception {
        Exception exception = null;
        Object object = this.lock;
        synchronized (object) {
            if (this.multipleComponentLeaderElectionService != null) {
                try {
                    this.multipleComponentLeaderElectionService.close();
                }
                catch (Exception e) {
                    exception = e;
                }
                this.multipleComponentLeaderElectionService = null;
            }
        }
        this.configMapSharedWatcher.close();
        ExceptionUtils.tryRethrowException((Exception)exception);
    }

    public void internalCleanup() throws Exception {
        Exception exception = null;
        try {
            this.closeK8sServices();
        }
        catch (Exception e) {
            exception = e;
        }
        this.kubeClient.deleteConfigMapsByLabels(KubernetesUtils.getConfigMapLabels(this.clusterId, "high-availability")).get();
        ExceptionUtils.tryRethrowException((Exception)exception);
    }

    public void internalCleanupJobData(JobID jobID) throws Exception {
        this.kubeClient.deleteConfigMap(this.getJobSpecificConfigMap(jobID)).get();
    }

    protected String getLeaderPathForResourceManager() {
        return "resourcemanager";
    }

    protected String getLeaderPathForDispatcher() {
        return "dispatcher";
    }

    protected String getLeaderPathForJobManager(JobID jobID) {
        return jobID.toString();
    }

    protected String getLeaderPathForRestServer() {
        return "restserver";
    }
}

