/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher.runner;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.AllCallbackWrapper;
import org.apache.flink.core.testutils.CustomExtension;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.dispatcher.DispatcherBootstrapFactory;
import org.apache.flink.runtime.dispatcher.DispatcherFactory;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.DispatcherServices;
import org.apache.flink.runtime.dispatcher.JobManagerRunnerFactory;
import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
import org.apache.flink.runtime.dispatcher.PartialDispatcherServicesWithJobPersistenceComponents;
import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
import org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
import org.apache.flink.runtime.dispatcher.TestingJobMasterServiceLeadershipRunnerFactory;
import org.apache.flink.runtime.dispatcher.TestingPartialDispatcherServices;
import org.apache.flink.runtime.dispatcher.cleanup.CleanupRunnerFactory;
import org.apache.flink.runtime.dispatcher.cleanup.TestingCleanupRunnerFactory;
import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
import org.apache.flink.runtime.dispatcher.runner.DispatcherRunner;
import org.apache.flink.runtime.highavailability.JobResultStore;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedJobResultStore;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.runtime.jobmanager.JobPersistenceComponentFactory;
import org.apache.flink.runtime.jobmanager.TestingJobPersistenceComponentFactory;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
import org.apache.flink.runtime.leaderelection.LeaderElection;
import org.apache.flink.runtime.leaderelection.LeaderInformation;
import org.apache.flink.runtime.leaderelection.TestingLeaderElection;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcServiceExtension;
import org.apache.flink.runtime.testutils.TestingJobGraphStore;
import org.apache.flink.runtime.util.BlobServerExtension;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class DefaultDispatcherRunnerITCase {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultDispatcherRunnerITCase.class);
    private static final Duration TIMEOUT = Duration.ofSeconds(10L);
    @RegisterExtension
    public static AllCallbackWrapper<TestingRpcServiceExtension> rpcServiceExtensionWrapper = new AllCallbackWrapper((CustomExtension)new TestingRpcServiceExtension());
    @RegisterExtension
    public static AllCallbackWrapper<BlobServerExtension> blobServerExtensionWrapper = new AllCallbackWrapper((CustomExtension)new BlobServerExtension());
    @RegisterExtension
    public static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private JobGraph jobGraph;
    private TestingLeaderElection dispatcherLeaderElection;
    private TestingFatalErrorHandler fatalErrorHandler;
    private JobGraphStore jobGraphStore;
    private JobResultStore jobResultStore;
    private PartialDispatcherServices partialDispatcherServices;
    private DefaultDispatcherRunnerFactory dispatcherRunnerFactory;

    DefaultDispatcherRunnerITCase() {
    }

    @BeforeEach
    void setup() {
        this.dispatcherRunnerFactory = DefaultDispatcherRunnerFactory.createSessionRunner((DispatcherFactory)SessionDispatcherFactory.INSTANCE);
        this.jobGraph = DefaultDispatcherRunnerITCase.createJobGraph();
        this.dispatcherLeaderElection = new TestingLeaderElection();
        this.fatalErrorHandler = new TestingFatalErrorHandler();
        this.jobGraphStore = TestingJobGraphStore.newBuilder().build();
        this.jobResultStore = new EmbeddedJobResultStore();
        this.partialDispatcherServices = TestingPartialDispatcherServices.builder().withFatalErrorHandler(this.fatalErrorHandler).build(((BlobServerExtension)blobServerExtensionWrapper.getCustomExtension()).getBlobServer(), new Configuration());
    }

    @AfterEach
    void teardown() throws Exception {
        if (this.fatalErrorHandler != null) {
            this.fatalErrorHandler.rethrowError();
        }
    }

    @Test
    void leaderChange_afterJobSubmission_recoversSubmittedJob() throws Exception {
        try (DispatcherRunner dispatcherRunner = this.createDispatcherRunner();){
            UUID firstLeaderSessionId = UUID.randomUUID();
            DispatcherGateway firstDispatcherGateway = this.electLeaderAndRetrieveGateway(firstLeaderSessionId);
            firstDispatcherGateway.submitJob(this.jobGraph, TIMEOUT).get();
            this.dispatcherLeaderElection.notLeader();
            UUID secondLeaderSessionId = UUID.randomUUID();
            DispatcherGateway secondDispatcherGateway = this.electLeaderAndRetrieveGateway(secondLeaderSessionId);
            Collection jobIds = (Collection)secondDispatcherGateway.listJobs(TIMEOUT).get();
            Assertions.assertThat((Collection)jobIds).containsExactly((Object[])new JobID[]{this.jobGraph.getJobID()});
        }
    }

    private DispatcherGateway electLeaderAndRetrieveGateway(UUID firstLeaderSessionId) throws InterruptedException, ExecutionException {
        return (DispatcherGateway)((CompletableFuture)this.dispatcherLeaderElection.isLeader(firstLeaderSessionId).thenCompose(leaderInformation -> ((TestingRpcServiceExtension)rpcServiceExtensionWrapper.getCustomExtension()).getTestingRpcService().connect(leaderInformation.getLeaderAddress(), DispatcherId.fromUuid((UUID)leaderInformation.getLeaderSessionID()), DispatcherGateway.class))).get();
    }

    @Test
    void leaderChange_withBlockingJobManagerTermination_doesNotAffectNewLeader() throws Exception {
        TestingJobMasterServiceLeadershipRunnerFactory jobManagerRunnerFactory = new TestingJobMasterServiceLeadershipRunnerFactory(1);
        TestingCleanupRunnerFactory cleanupRunnerFactory = new TestingCleanupRunnerFactory();
        this.dispatcherRunnerFactory = DefaultDispatcherRunnerFactory.createSessionRunner((DispatcherFactory)new TestingDispatcherFactory(jobManagerRunnerFactory, cleanupRunnerFactory));
        this.jobGraphStore.start(null);
        this.jobGraphStore.putJobGraph(this.jobGraph);
        try (DispatcherRunner dispatcherRunner = this.createDispatcherRunner();){
            this.dispatcherLeaderElection.isLeader(UUID.randomUUID()).get();
            try (TestingJobManagerRunner testingJobManagerRunner = jobManagerRunnerFactory.takeCreatedJobManagerRunner();){
                this.dispatcherLeaderElection.notLeader();
                LOG.info("Re-grant leadership first time.");
                this.dispatcherLeaderElection.isLeader(UUID.randomUUID());
                Thread.sleep(1L);
                this.dispatcherLeaderElection.notLeader();
                LOG.info("Re-grant leadership second time.");
                UUID leaderSessionId = UUID.randomUUID();
                CompletableFuture<LeaderInformation> confirmedLeaderInformation = this.dispatcherLeaderElection.isLeader(leaderSessionId);
                Assertions.assertThat(confirmedLeaderInformation).isNotDone();
                LOG.info("Complete the termination of the first job manager runner.");
                testingJobManagerRunner.completeTerminationFuture();
                LeaderInformation actualConfirmedLeaderInformation = confirmedLeaderInformation.join();
                Assertions.assertThat((Comparable)actualConfirmedLeaderInformation.getLeaderSessionID()).isEqualTo((Object)leaderSessionId);
                DispatcherGateway leaderGateway = ((TestingRpcServiceExtension)rpcServiceExtensionWrapper.getCustomExtension()).getTestingRpcService().connect(actualConfirmedLeaderInformation.getLeaderAddress(), DispatcherId.fromUuid((UUID)actualConfirmedLeaderInformation.getLeaderSessionID()), DispatcherGateway.class).get();
                FlinkAssertions.assertThatFuture((CompletableFuture)leaderGateway.listJobs(TIMEOUT)).eventuallySucceeds().isEqualTo(Collections.singleton(this.jobGraph.getJobID()));
            }
        }
    }

    private static JobGraph createJobGraph() {
        return JobGraphTestUtils.singleNoOpJobGraph();
    }

    private DispatcherRunner createDispatcherRunner() throws Exception {
        return this.dispatcherRunnerFactory.createDispatcherRunner((LeaderElection)this.dispatcherLeaderElection, (FatalErrorHandler)this.fatalErrorHandler, (JobPersistenceComponentFactory)new TestingJobPersistenceComponentFactory(this.jobGraphStore, this.jobResultStore), (Executor)EXECUTOR_RESOURCE.getExecutor(), (RpcService)((TestingRpcServiceExtension)rpcServiceExtensionWrapper.getCustomExtension()).getTestingRpcService(), this.partialDispatcherServices);
    }

    private static class TestingDispatcherFactory
    implements DispatcherFactory {
        private final JobManagerRunnerFactory jobManagerRunnerFactory;
        private final CleanupRunnerFactory cleanupRunnerFactory;

        private TestingDispatcherFactory(JobManagerRunnerFactory jobManagerRunnerFactory, CleanupRunnerFactory cleanupRunnerFactory) {
            this.jobManagerRunnerFactory = jobManagerRunnerFactory;
            this.cleanupRunnerFactory = cleanupRunnerFactory;
        }

        public Dispatcher createDispatcher(RpcService rpcService, DispatcherId fencingToken, Collection<JobGraph> recoveredJobs, Collection<JobResult> recoveredDirtyJobResults, DispatcherBootstrapFactory dispatcherBootstrapFactory, PartialDispatcherServicesWithJobPersistenceComponents partialDispatcherServicesWithJobPersistenceComponents) throws Exception {
            return new StandaloneDispatcher(rpcService, fencingToken, recoveredJobs, recoveredDirtyJobResults, dispatcherBootstrapFactory, DispatcherServices.from((PartialDispatcherServicesWithJobPersistenceComponents)partialDispatcherServicesWithJobPersistenceComponents, (JobManagerRunnerFactory)this.jobManagerRunnerFactory, (CleanupRunnerFactory)this.cleanupRunnerFactory));
        }
    }
}

