/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher.runner;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess;
import org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess;
import org.apache.flink.runtime.dispatcher.runner.TestingDispatcherGatewayService;
import org.apache.flink.runtime.highavailability.JobResultStore;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.testutils.TestingJobGraphStore;
import org.apache.flink.runtime.testutils.TestingJobResultStore;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.BiFunctionWithException;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.flink.util.function.ThrowingConsumer;
import org.apache.flink.util.function.ThrowingRunnable;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={TestLoggerExtension.class})
public class SessionDispatcherLeaderProcessTest {
    private static final JobGraph JOB_GRAPH = JobGraphTestUtils.emptyJobGraph();
    private static ExecutorService ioExecutor;
    private final UUID leaderSessionId = UUID.randomUUID();
    private TestingFatalErrorHandler fatalErrorHandler;
    private JobGraphStore jobGraphStore;
    private JobResultStore jobResultStore;
    private AbstractDispatcherLeaderProcess.DispatcherGatewayServiceFactory dispatcherServiceFactory;

    @BeforeAll
    public static void setupClass() {
        ioExecutor = Executors.newSingleThreadExecutor();
    }

    @BeforeEach
    public void setup() {
        this.fatalErrorHandler = new TestingFatalErrorHandler();
        this.jobGraphStore = TestingJobGraphStore.newBuilder().build();
        this.jobResultStore = TestingJobResultStore.builder().build();
        this.dispatcherServiceFactory = this.createFactoryBasedOnGenericSupplier(() -> TestingDispatcherGatewayService.newBuilder().build());
    }

    @AfterEach
    public void teardown() throws Exception {
        if (this.fatalErrorHandler != null) {
            this.fatalErrorHandler.rethrowError();
            this.fatalErrorHandler = null;
        }
    }

    @AfterAll
    public static void teardownClass() {
        if (ioExecutor != null) {
            ExecutorUtils.gracefulShutdown((long)5L, (TimeUnit)TimeUnit.SECONDS, (ExecutorService[])new ExecutorService[]{ioExecutor});
        }
    }

    @Test
    public void start_afterClose_doesNotHaveAnEffect() throws Exception {
        SessionDispatcherLeaderProcess dispatcherLeaderProcess = this.createDispatcherLeaderProcess();
        dispatcherLeaderProcess.close();
        dispatcherLeaderProcess.start();
        Assertions.assertThat((Comparable)dispatcherLeaderProcess.getState()).isEqualTo((Object)AbstractDispatcherLeaderProcess.State.STOPPED);
    }

    @Test
    public void testStartTriggeringDispatcherServiceCreation() throws Exception {
        this.dispatcherServiceFactory = this.createFactoryBasedOnGenericSupplier(() -> TestingDispatcherGatewayService.newBuilder().build());
        try (SessionDispatcherLeaderProcess dispatcherLeaderProcess = this.createDispatcherLeaderProcess();){
            dispatcherLeaderProcess.start();
            Assertions.assertThat((Comparable)dispatcherLeaderProcess.getState()).isEqualTo((Object)AbstractDispatcherLeaderProcess.State.RUNNING);
        }
    }

    @Test
    public void testRecoveryWithJobGraphButNoDirtyJobResult() throws Exception {
        this.testJobRecovery(Collections.singleton(JOB_GRAPH), Collections.emptySet(), actualRecoveredJobGraphs -> {
            ObjectAssert cfr_ignored_0 = (ObjectAssert)((ObjectAssert)Assertions.assertThat((Collection)actualRecoveredJobGraphs).singleElement()).isEqualTo((Object)JOB_GRAPH);
        }, actualRecoveredDirtyJobResults -> Assertions.assertThat((Collection)actualRecoveredDirtyJobResults).isEmpty());
    }

    @Test
    public void testRecoveryWithJobGraphAndMatchingDirtyJobResult() throws Exception {
        JobResult matchingDirtyJobResult = TestingJobResultStore.createSuccessfulJobResult(JOB_GRAPH.getJobID());
        this.testJobRecovery(Collections.singleton(JOB_GRAPH), Collections.singleton(matchingDirtyJobResult), actualRecoveredJobGraphs -> Assertions.assertThat((Collection)actualRecoveredJobGraphs).isEmpty(), actualRecoveredDirtyJobResults -> {
            ObjectAssert cfr_ignored_0 = (ObjectAssert)((ObjectAssert)Assertions.assertThat((Collection)actualRecoveredDirtyJobResults).singleElement()).isEqualTo((Object)matchingDirtyJobResult);
        });
    }

    @Test
    public void testRecoveryWithMultipleJobGraphsAndOneMatchingDirtyJobResult() throws Exception {
        JobResult matchingDirtyJobResult = TestingJobResultStore.createSuccessfulJobResult(JOB_GRAPH.getJobID());
        JobGraph otherJobGraph = JobGraphTestUtils.emptyJobGraph();
        this.testJobRecovery(Arrays.asList(otherJobGraph, JOB_GRAPH), Collections.singleton(matchingDirtyJobResult), actualRecoveredJobGraphs -> {
            ObjectAssert cfr_ignored_0 = (ObjectAssert)((ObjectAssert)Assertions.assertThat((Collection)actualRecoveredJobGraphs).singleElement()).isEqualTo((Object)otherJobGraph);
        }, actualRecoveredDirtyJobResults -> {
            ObjectAssert cfr_ignored_0 = (ObjectAssert)((ObjectAssert)Assertions.assertThat((Collection)actualRecoveredDirtyJobResults).singleElement()).isEqualTo((Object)matchingDirtyJobResult);
        });
    }

    @Test
    public void testRecoveryWithoutJobGraphButDirtyJobResult() throws Exception {
        JobResult dirtyJobResult = TestingJobResultStore.createSuccessfulJobResult(new JobID());
        this.testJobRecovery(Collections.emptyList(), Collections.singleton(dirtyJobResult), actualRecoveredJobGraphs -> Assertions.assertThat((Collection)actualRecoveredJobGraphs).isEmpty(), actualRecoveredDirtyJobResults -> {
            ObjectAssert cfr_ignored_0 = (ObjectAssert)((ObjectAssert)Assertions.assertThat((Collection)actualRecoveredDirtyJobResults).singleElement()).isEqualTo((Object)dirtyJobResult);
        });
    }

    private void testJobRecovery(Collection<JobGraph> jobGraphsToRecover, Set<JobResult> dirtyJobResults, Consumer<Collection<JobGraph>> recoveredJobGraphAssertion, Consumer<Collection<JobResult>> recoveredDirtyJobResultAssertion) throws Exception {
        this.jobGraphStore = TestingJobGraphStore.newBuilder().setInitialJobGraphs(jobGraphsToRecover).build();
        this.jobResultStore = TestingJobResultStore.builder().withGetDirtyResultsSupplier((SupplierWithException<Set<JobResult>, ? extends IOException>)((SupplierWithException)() -> dirtyJobResults)).build();
        CompletableFuture recoveredJobGraphsFuture = new CompletableFuture();
        CompletableFuture recoveredDirtyJobResultsFuture = new CompletableFuture();
        this.dispatcherServiceFactory = (ignoredDispatcherId, recoveredJobs, recoveredDirtyJobResults, ignoredJobGraphWriter, ignoredJobResultStore) -> {
            recoveredJobGraphsFuture.complete(recoveredJobs);
            recoveredDirtyJobResultsFuture.complete(recoveredDirtyJobResults);
            return TestingDispatcherGatewayService.newBuilder().build();
        };
        try (SessionDispatcherLeaderProcess dispatcherLeaderProcess = this.createDispatcherLeaderProcess();){
            dispatcherLeaderProcess.start();
            recoveredJobGraphAssertion.accept((Collection<JobGraph>)recoveredJobGraphsFuture.get());
            recoveredDirtyJobResultAssertion.accept((Collection<JobResult>)recoveredDirtyJobResultsFuture.get());
        }
    }

    @Test
    public void testRecoveryWhileJobGraphRecoveryIsScheduledConcurrently() throws Exception {
        JobResult dirtyJobResult = TestingJobResultStore.createSuccessfulJobResult(new JobID());
        OneShotLatch recoveryInitiatedLatch = new OneShotLatch();
        OneShotLatch jobGraphAddedLatch = new OneShotLatch();
        this.jobGraphStore = TestingJobGraphStore.newBuilder().setRecoverJobGraphFunction((BiFunctionWithException<JobID, Map<JobID, JobGraph>, JobGraph, ? extends Exception>)((BiFunctionWithException)(jobId, jobs) -> null)).build();
        this.jobResultStore = TestingJobResultStore.builder().withGetDirtyResultsSupplier((SupplierWithException<Set<JobResult>, ? extends IOException>)((SupplierWithException)() -> {
            recoveryInitiatedLatch.trigger();
            try {
                jobGraphAddedLatch.await();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return Collections.singleton(dirtyJobResult);
        })).build();
        CompletableFuture recoveredJobGraphsFuture = new CompletableFuture();
        CompletableFuture recoveredDirtyJobResultsFuture = new CompletableFuture();
        this.dispatcherServiceFactory = (ignoredDispatcherId, recoveredJobs, recoveredDirtyJobResults, ignoredJobGraphWriter, ignoredJobResultStore) -> {
            recoveredJobGraphsFuture.complete(recoveredJobs);
            recoveredDirtyJobResultsFuture.complete(recoveredDirtyJobResults);
            return TestingDispatcherGatewayService.newBuilder().build();
        };
        try (SessionDispatcherLeaderProcess dispatcherLeaderProcess = this.createDispatcherLeaderProcess();){
            dispatcherLeaderProcess.start();
            recoveryInitiatedLatch.await();
            dispatcherLeaderProcess.onAddedJobGraph(dirtyJobResult.getJobId());
            jobGraphAddedLatch.trigger();
            Assertions.assertThat(recoveredJobGraphsFuture).succeedsWithin(Duration.ofHours(1L)).satisfies(recovedJobGraphs -> Assertions.assertThat((Collection)recovedJobGraphs).isEmpty());
            Assertions.assertThat(recoveredDirtyJobResultsFuture).succeedsWithin(Duration.ofHours(1L)).satisfies(recoveredDirtyJobResults -> {
                AbstractCollectionAssert cfr_ignored_0 = (AbstractCollectionAssert)Assertions.assertThat((Collection)recoveredDirtyJobResults).containsExactly((Object[])new JobResult[]{dirtyJobResult});
            });
        }
    }

    @Test
    public void closeAsync_stopsJobGraphStoreAndDispatcher() throws Exception {
        CompletableFuture jobGraphStopFuture = new CompletableFuture();
        this.jobGraphStore = TestingJobGraphStore.newBuilder().setStopRunnable((ThrowingRunnable<? extends Exception>)((ThrowingRunnable)() -> jobGraphStopFuture.complete(null))).build();
        CompletableFuture<Object> dispatcherServiceTerminationFuture = new CompletableFuture<Object>();
        this.dispatcherServiceFactory = this.createFactoryBasedOnGenericSupplier(() -> TestingDispatcherGatewayService.newBuilder().setTerminationFuture(dispatcherServiceTerminationFuture).withManualTerminationFutureCompletion().build());
        try (SessionDispatcherLeaderProcess dispatcherLeaderProcess = this.createDispatcherLeaderProcess();){
            dispatcherLeaderProcess.start();
            dispatcherLeaderProcess.getDispatcherGateway().get();
            CompletableFuture terminationFuture = dispatcherLeaderProcess.closeAsync();
            Assertions.assertThat(jobGraphStopFuture).isNotDone();
            Assertions.assertThat((CompletableFuture)terminationFuture).isNotDone();
            dispatcherServiceTerminationFuture.complete(null);
            jobGraphStopFuture.get();
            terminationFuture.get();
        }
    }

    @Test
    public void unexpectedDispatcherServiceTerminationWhileRunning_callsFatalErrorHandler() {
        CompletableFuture terminationFuture = new CompletableFuture();
        this.dispatcherServiceFactory = this.createFactoryBasedOnGenericSupplier(() -> TestingDispatcherGatewayService.newBuilder().setTerminationFuture(terminationFuture).build());
        SessionDispatcherLeaderProcess dispatcherLeaderProcess = this.createDispatcherLeaderProcess();
        dispatcherLeaderProcess.start();
        FlinkException expectedFailure = new FlinkException("Expected test failure.");
        terminationFuture.completeExceptionally(expectedFailure);
        Throwable error = this.fatalErrorHandler.getErrorFuture().join();
        Assertions.assertThat((Throwable)error).getRootCause().isEqualTo((Object)expectedFailure);
        this.fatalErrorHandler.clearError();
    }

    @Test
    public void unexpectedDispatcherServiceTerminationWhileNotRunning_doesNotCallFatalErrorHandler() {
        CompletableFuture terminationFuture = new CompletableFuture();
        this.dispatcherServiceFactory = this.createFactoryBasedOnGenericSupplier(() -> TestingDispatcherGatewayService.newBuilder().setTerminationFuture(terminationFuture).withManualTerminationFutureCompletion().build());
        SessionDispatcherLeaderProcess dispatcherLeaderProcess = this.createDispatcherLeaderProcess();
        dispatcherLeaderProcess.start();
        dispatcherLeaderProcess.closeAsync();
        FlinkException expectedFailure = new FlinkException("Expected test failure.");
        terminationFuture.completeExceptionally(expectedFailure);
        Assertions.assertThatThrownBy(() -> this.fatalErrorHandler.getErrorFuture().get(10L, TimeUnit.MILLISECONDS)).isInstanceOf(TimeoutException.class);
    }

    @Test
    public void confirmLeaderSessionFuture_completesAfterDispatcherServiceHasBeenStarted() throws Exception {
        OneShotLatch createDispatcherServiceLatch = new OneShotLatch();
        String dispatcherAddress = "myAddress";
        TestingDispatcherGateway dispatcherGateway = ((TestingDispatcherGateway.Builder)TestingDispatcherGateway.newBuilder().setAddress("myAddress")).build();
        this.dispatcherServiceFactory = this.createFactoryBasedOnGenericSupplier(() -> {
            try {
                createDispatcherServiceLatch.await();
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return TestingDispatcherGatewayService.newBuilder().setDispatcherGateway(dispatcherGateway).build();
        });
        try (SessionDispatcherLeaderProcess dispatcherLeaderProcess = this.createDispatcherLeaderProcess();){
            CompletableFuture confirmLeaderSessionFuture = dispatcherLeaderProcess.getLeaderAddressFuture();
            dispatcherLeaderProcess.start();
            Assertions.assertThat((CompletableFuture)confirmLeaderSessionFuture).isNotDone();
            createDispatcherServiceLatch.trigger();
            Assertions.assertThat((CompletableFuture)confirmLeaderSessionFuture).succeedsWithin(100L, TimeUnit.MILLISECONDS).isEqualTo((Object)"myAddress");
        }
    }

    @Test
    public void closeAsync_duringJobRecovery_preventsDispatcherServiceCreation() throws Exception {
        OneShotLatch jobRecoveryStartedLatch = new OneShotLatch();
        OneShotLatch completeJobRecoveryLatch = new OneShotLatch();
        OneShotLatch createDispatcherServiceLatch = new OneShotLatch();
        this.jobGraphStore = TestingJobGraphStore.newBuilder().setJobIdsFunction((FunctionWithException<Collection<JobID>, Collection<JobID>, ? extends Exception>)((FunctionWithException)storedJobs -> {
            jobRecoveryStartedLatch.trigger();
            completeJobRecoveryLatch.await();
            return storedJobs;
        })).build();
        this.dispatcherServiceFactory = this.createFactoryBasedOnGenericSupplier(() -> {
            createDispatcherServiceLatch.trigger();
            return TestingDispatcherGatewayService.newBuilder().build();
        });
        try (SessionDispatcherLeaderProcess dispatcherLeaderProcess = this.createDispatcherLeaderProcess();){
            dispatcherLeaderProcess.start();
            jobRecoveryStartedLatch.await();
            dispatcherLeaderProcess.closeAsync();
            completeJobRecoveryLatch.trigger();
            Assertions.assertThatThrownBy(() -> createDispatcherServiceLatch.await(10L, TimeUnit.MILLISECONDS), (String)"No dispatcher service should be created after the process has been stopped.", (Object[])new Object[0]).isInstanceOf(TimeoutException.class);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void onRemovedJobGraph_terminatesRunningJob() throws Exception {
        this.jobGraphStore = TestingJobGraphStore.newBuilder().setInitialJobGraphs(Collections.singleton(JOB_GRAPH)).build();
        CompletableFuture terminateJobFuture = new CompletableFuture();
        TestingDispatcherGatewayService testingDispatcherService = TestingDispatcherGatewayService.newBuilder().setOnRemovedJobGraphFunction(jobID -> {
            terminateJobFuture.complete(jobID);
            return FutureUtils.completedVoidFuture();
        }).build();
        this.dispatcherServiceFactory = this.createFactoryBasedOnGenericSupplier(() -> testingDispatcherService);
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        try (SessionDispatcherLeaderProcess dispatcherLeaderProcess = this.createDispatcherLeaderProcess();){
            dispatcherLeaderProcess.start();
            dispatcherLeaderProcess.getDispatcherGateway().get();
            this.jobGraphStore.globalCleanupAsync(JOB_GRAPH.getJobID(), (Executor)executorService).join();
            dispatcherLeaderProcess.onRemovedJobGraph(JOB_GRAPH.getJobID());
            Assertions.assertThat((Comparable)((Comparable)terminateJobFuture.get())).isEqualTo((Object)JOB_GRAPH.getJobID());
        }
        finally {
            Assertions.assertThat(executorService.shutdownNow()).isEmpty();
        }
    }

    @Test
    public void onRemovedJobGraph_failingRemovalCall_failsFatally() throws Exception {
        FlinkException testException = new FlinkException("Test exception");
        TestingDispatcherGatewayService testingDispatcherService = TestingDispatcherGatewayService.newBuilder().setOnRemovedJobGraphFunction(jobID -> FutureUtils.completedExceptionally((Throwable)testException)).build();
        this.dispatcherServiceFactory = this.createFactoryBasedOnGenericSupplier(() -> testingDispatcherService);
        try (SessionDispatcherLeaderProcess dispatcherLeaderProcess = this.createDispatcherLeaderProcess();){
            dispatcherLeaderProcess.start();
            dispatcherLeaderProcess.getDispatcherGateway().get();
            dispatcherLeaderProcess.onRemovedJobGraph(JOB_GRAPH.getJobID());
            Throwable fatalError = this.fatalErrorHandler.getErrorFuture().join();
            Assertions.assertThat((Throwable)fatalError).hasCause((Throwable)testException);
            this.fatalErrorHandler.clearError();
        }
    }

    @Test
    public void onAddedJobGraph_submitsRecoveredJob() throws Exception {
        CompletableFuture submittedJobFuture = new CompletableFuture();
        TestingDispatcherGateway testingDispatcherGateway = TestingDispatcherGateway.newBuilder().setSubmitFunction(submittedJob -> {
            submittedJobFuture.complete(submittedJob);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build();
        this.dispatcherServiceFactory = this.createFactoryBasedOnGenericSupplier(() -> TestingDispatcherGatewayService.newBuilder().setDispatcherGateway(testingDispatcherGateway).build());
        try (SessionDispatcherLeaderProcess dispatcherLeaderProcess = this.createDispatcherLeaderProcess();){
            dispatcherLeaderProcess.start();
            dispatcherLeaderProcess.getDispatcherGateway().get();
            this.jobGraphStore.putJobGraph(JOB_GRAPH);
            dispatcherLeaderProcess.onAddedJobGraph(JOB_GRAPH.getJobID());
            JobGraph submittedJobGraph = (JobGraph)submittedJobFuture.get();
            Assertions.assertThat((Comparable)submittedJobGraph.getJobID()).isEqualTo((Object)JOB_GRAPH.getJobID());
        }
    }

    @Test
    public void onAddedJobGraph_ifNotRunning_isBeingIgnored() throws Exception {
        CompletableFuture recoveredJobFuture = new CompletableFuture();
        this.jobGraphStore = TestingJobGraphStore.newBuilder().setRecoverJobGraphFunction((BiFunctionWithException<JobID, Map<JobID, JobGraph>, JobGraph, ? extends Exception>)((BiFunctionWithException)(jobId, jobGraphs) -> {
            recoveredJobFuture.complete(jobId);
            return (JobGraph)jobGraphs.get(jobId);
        })).build();
        try (SessionDispatcherLeaderProcess dispatcherLeaderProcess = this.createDispatcherLeaderProcess();){
            dispatcherLeaderProcess.start();
            dispatcherLeaderProcess.getDispatcherGateway().get();
            this.jobGraphStore.putJobGraph(JOB_GRAPH);
            dispatcherLeaderProcess.closeAsync();
            dispatcherLeaderProcess.onAddedJobGraph(JOB_GRAPH.getJobID());
            Assertions.assertThatThrownBy(() -> {
                JobID cfr_ignored_0 = (JobID)recoveredJobFuture.get(10L, TimeUnit.MILLISECONDS);
            }, (String)"onAddedJobGraph should be ignored if the leader process is not running.", (Object[])new Object[0]).isInstanceOf(TimeoutException.class);
        }
    }

    @Test
    public void onAddedJobGraph_failingRecovery_propagatesTheFailure() throws Exception {
        FlinkException expectedFailure = new FlinkException("Expected failure");
        this.jobGraphStore = TestingJobGraphStore.newBuilder().setRecoverJobGraphFunction((BiFunctionWithException<JobID, Map<JobID, JobGraph>, JobGraph, ? extends Exception>)((BiFunctionWithException)(ignoredA, ignoredB) -> {
            throw expectedFailure;
        })).build();
        try (SessionDispatcherLeaderProcess dispatcherLeaderProcess = this.createDispatcherLeaderProcess();){
            dispatcherLeaderProcess.start();
            dispatcherLeaderProcess.getDispatcherGateway().get();
            this.jobGraphStore.putJobGraph(JOB_GRAPH);
            dispatcherLeaderProcess.onAddedJobGraph(JOB_GRAPH.getJobID());
            ((ListAssert)Assertions.assertThat(this.fatalErrorHandler.getErrorFuture()).succeedsWithin(100L, TimeUnit.MILLISECONDS).extracting(FlinkAssertions::chainOfCauses, FlinkAssertions.STREAM_THROWABLE)).contains((Object[])new Throwable[]{expectedFailure});
            Assertions.assertThat((Comparable)dispatcherLeaderProcess.getState()).isEqualTo((Object)AbstractDispatcherLeaderProcess.State.STOPPED);
            this.fatalErrorHandler.clearError();
        }
    }

    @Test
    public void recoverJobs_withRecoveryFailure_failsFatally() throws Exception {
        FlinkException testException = new FlinkException("Test exception");
        this.jobGraphStore = TestingJobGraphStore.newBuilder().setRecoverJobGraphFunction((BiFunctionWithException<JobID, Map<JobID, JobGraph>, JobGraph, ? extends Exception>)((BiFunctionWithException)(ignoredA, ignoredB) -> {
            throw testException;
        })).setInitialJobGraphs(Collections.singleton(JOB_GRAPH)).build();
        this.runJobRecoveryFailureTest(testException);
    }

    @Test
    public void recoverJobs_withJobIdRecoveryFailure_failsFatally() throws Exception {
        FlinkException testException = new FlinkException("Test exception");
        this.jobGraphStore = TestingJobGraphStore.newBuilder().setJobIdsFunction((FunctionWithException<Collection<JobID>, Collection<JobID>, ? extends Exception>)((FunctionWithException)ignored -> {
            throw testException;
        })).build();
        this.runJobRecoveryFailureTest(testException);
    }

    private void runJobRecoveryFailureTest(FlinkException testException) throws Exception {
        try (SessionDispatcherLeaderProcess dispatcherLeaderProcess = this.createDispatcherLeaderProcess();){
            dispatcherLeaderProcess.start();
            Assertions.assertThat(this.fatalErrorHandler.getErrorFuture()).succeedsWithin(100L, TimeUnit.MILLISECONDS).satisfies(error -> {
                AbstractThrowableAssert cfr_ignored_0 = (AbstractThrowableAssert)Assertions.assertThat((Throwable)error).satisfies(FlinkAssertions.anyCauseMatches(((Object)((Object)testException)).getClass(), (String)testException.getMessage()));
            });
            this.fatalErrorHandler.clearError();
        }
    }

    @Test
    public void onAddedJobGraph_failingRecoveredJobSubmission_failsFatally() throws Exception {
        TestingDispatcherGateway dispatcherGateway = TestingDispatcherGateway.newBuilder().setSubmitFunction(jobGraph -> FutureUtils.completedExceptionally((Throwable)new JobSubmissionException(jobGraph.getJobID(), "test exception"))).build();
        this.runOnAddedJobGraphTest(dispatcherGateway, (ThrowingConsumer<TestingFatalErrorHandler, Exception>)((ThrowingConsumer)this::verifyOnAddedJobGraphResultFailsFatally));
    }

    private void verifyOnAddedJobGraphResultFailsFatally(TestingFatalErrorHandler fatalErrorHandler) {
        Throwable actualCause = fatalErrorHandler.getErrorFuture().join();
        ((ListAssert)Assertions.assertThat((Throwable)actualCause).extracting(FlinkAssertions::chainOfCauses, FlinkAssertions.STREAM_THROWABLE)).hasAtLeastOneElementOfType(JobSubmissionException.class);
        fatalErrorHandler.clearError();
    }

    @Test
    public void onAddedJobGraph_duplicateJobSubmissionDueToFalsePositive_willBeIgnored() throws Exception {
        TestingDispatcherGateway dispatcherGateway = TestingDispatcherGateway.newBuilder().setSubmitFunction(jobGraph -> FutureUtils.completedExceptionally((Throwable)DuplicateJobSubmissionException.of((JobID)jobGraph.getJobID()))).build();
        this.runOnAddedJobGraphTest(dispatcherGateway, (ThrowingConsumer<TestingFatalErrorHandler, Exception>)((ThrowingConsumer)this::verifyOnAddedJobGraphResultDidNotFail));
    }

    private void runOnAddedJobGraphTest(TestingDispatcherGateway dispatcherGateway, ThrowingConsumer<TestingFatalErrorHandler, Exception> verificationLogic) throws Exception {
        this.jobGraphStore = TestingJobGraphStore.newBuilder().setInitialJobGraphs(Collections.singleton(JOB_GRAPH)).build();
        this.dispatcherServiceFactory = this.createFactoryBasedOnJobGraphs(jobGraphs -> {
            Assertions.assertThat((Collection)jobGraphs).containsExactlyInAnyOrder((Object[])new JobGraph[]{JOB_GRAPH});
            return TestingDispatcherGatewayService.newBuilder().setDispatcherGateway(dispatcherGateway).build();
        });
        try (SessionDispatcherLeaderProcess dispatcherLeaderProcess = this.createDispatcherLeaderProcess();){
            dispatcherLeaderProcess.start();
            dispatcherLeaderProcess.getDispatcherGateway().get();
            dispatcherLeaderProcess.onAddedJobGraph(JOB_GRAPH.getJobID());
            verificationLogic.accept((Object)this.fatalErrorHandler);
        }
    }

    private AbstractDispatcherLeaderProcess.DispatcherGatewayServiceFactory createFactoryBasedOnJobGraphs(Function<Collection<JobGraph>, AbstractDispatcherLeaderProcess.DispatcherGatewayService> createFunction) {
        return (ignoredDispatcherId, recoveredJobs, ignoredRecoveredDirtyJobResults, ignoredJobGraphWriter, ignoredJobResultStore) -> (AbstractDispatcherLeaderProcess.DispatcherGatewayService)createFunction.apply(recoveredJobs);
    }

    private AbstractDispatcherLeaderProcess.DispatcherGatewayServiceFactory createFactoryBasedOnGenericSupplier(Supplier<AbstractDispatcherLeaderProcess.DispatcherGatewayService> supplier) {
        return (ignoredDispatcherId, ignoredRecoveredJobs, ignoredRecoveredDirtyJobResults, ignoredJobGraphWriter, ignoredJobResultStore) -> (AbstractDispatcherLeaderProcess.DispatcherGatewayService)supplier.get();
    }

    private void verifyOnAddedJobGraphResultDidNotFail(TestingFatalErrorHandler fatalErrorHandler) {
        Assertions.assertThatThrownBy(() -> fatalErrorHandler.getErrorFuture().get(10L, TimeUnit.MILLISECONDS), (String)"Expected that duplicate job submissions due to false job recoveries are ignored.", (Object[])new Object[0]).isInstanceOf(TimeoutException.class);
    }

    private SessionDispatcherLeaderProcess createDispatcherLeaderProcess() {
        return SessionDispatcherLeaderProcess.create((UUID)this.leaderSessionId, (AbstractDispatcherLeaderProcess.DispatcherGatewayServiceFactory)this.dispatcherServiceFactory, (JobGraphStore)this.jobGraphStore, (JobResultStore)this.jobResultStore, (Executor)ioExecutor, (FatalErrorHandler)this.fatalErrorHandler);
    }
}

