/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.leaderelection;

import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.jobmaster.utils.JobResultUtils;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.SupplierWithException;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

public class LeaderChangeClusterComponentsTest
extends TestLogger {
    private static final Duration TESTING_TIMEOUT = Duration.ofMinutes(2L);
    private static final int SLOTS_PER_TM = 2;
    private static final int NUM_TMS = 2;
    public static final int PARALLELISM = 4;
    private static TestingMiniCluster miniCluster;
    private static EmbeddedHaServicesWithLeadershipControl highAvailabilityServices;
    private static Properties sysProps;
    private JobGraph jobGraph;
    private JobID jobId;

    @BeforeClass
    public static void setupClass() throws Exception {
        sysProps = System.getProperties();
        System.setProperty("flink.tests.enable-rm-multi-leader-session", "");
        highAvailabilityServices = new EmbeddedHaServicesWithLeadershipControl((Executor)TestingUtils.defaultExecutor());
        miniCluster = TestingMiniCluster.newBuilder(TestingMiniClusterConfiguration.newBuilder().setNumTaskManagers(2).setNumSlotsPerTaskManager(2).build()).setHighAvailabilityServicesSupplier(() -> highAvailabilityServices).build();
        miniCluster.start();
    }

    @Before
    public void setup() throws Exception {
        this.jobGraph = this.createJobGraph(4);
        this.jobId = this.jobGraph.getJobID();
    }

    @AfterClass
    public static void teardownClass() throws Exception {
        if (miniCluster != null) {
            miniCluster.close();
        }
        System.setProperties(sysProps);
    }

    @Test
    public void testReelectionOfDispatcher() throws Exception {
        CompletableFuture submissionFuture = miniCluster.submitJob(this.jobGraph);
        submissionFuture.get();
        CompletableFuture jobResultFuture = miniCluster.requestJobResult(this.jobId);
        highAvailabilityServices.revokeDispatcherLeadership().get();
        JobResult jobResult = (JobResult)jobResultFuture.get();
        Assert.assertEquals((Object)jobResult.getApplicationStatus(), (Object)ApplicationStatus.UNKNOWN);
        highAvailabilityServices.grantDispatcherLeadership();
        BlockingOperator.isBlocking = false;
        CompletableFuture submissionFuture2 = miniCluster.submitJob(this.jobGraph);
        submissionFuture2.get();
        CompletableFuture jobResultFuture2 = miniCluster.requestJobResult(this.jobId);
        jobResult = (JobResult)jobResultFuture2.get();
        JobResultUtils.assertSuccess(jobResult);
    }

    @Test
    public void testReelectionOfJobMaster() throws Exception {
        CompletableFuture submissionFuture = miniCluster.submitJob(this.jobGraph);
        submissionFuture.get();
        CompletableFuture jobResultFuture = miniCluster.requestJobResult(this.jobId);
        CommonTestUtils.waitUntilJobManagerIsInitialized((SupplierWithException<JobStatus, Exception>)((SupplierWithException)() -> (JobStatus)miniCluster.getJobStatus(this.jobId).get()));
        highAvailabilityServices.revokeJobMasterLeadership(this.jobId).get();
        JobResultUtils.assertIncomplete(jobResultFuture);
        BlockingOperator.isBlocking = false;
        highAvailabilityServices.grantJobMasterLeadership(this.jobId);
        JobResult jobResult = (JobResult)jobResultFuture.get();
        JobResultUtils.assertSuccess(jobResult);
    }

    @Test
    public void testTaskExecutorsReconnectToClusterWithLeadershipChange() throws Exception {
        Deadline deadline = Deadline.fromNow((Duration)TESTING_TIMEOUT);
        this.waitUntilTaskExecutorsHaveConnected(2, deadline);
        highAvailabilityServices.revokeResourceManagerLeadership().get();
        highAvailabilityServices.grantResourceManagerLeadership();
        Assert.assertThat((Object)LeaderRetrievalUtils.retrieveLeaderConnectionInfo((LeaderRetrievalService)highAvailabilityServices.getResourceManagerLeaderRetriever(), (Duration)TESTING_TIMEOUT).getLeaderSessionId(), (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
        this.waitUntilTaskExecutorsHaveConnected(2, deadline);
    }

    private void waitUntilTaskExecutorsHaveConnected(int numTaskExecutors, Deadline deadline) throws Exception {
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> ((ClusterOverview)miniCluster.requestClusterOverview().get()).getNumTaskManagersConnected() == numTaskExecutors), deadline, 10L);
    }

    private JobGraph createJobGraph(int parallelism) {
        BlockingOperator.isBlocking = true;
        JobVertex vertex = new JobVertex("blocking operator");
        vertex.setParallelism(parallelism);
        vertex.setInvokableClass(BlockingOperator.class);
        return JobGraphTestUtils.streamingJobGraph(vertex);
    }

    public static class BlockingOperator
    extends AbstractInvokable {
        static boolean isBlocking = true;

        public BlockingOperator(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            if (isBlocking) {
                BlockingOperator blockingOperator = this;
                synchronized (blockingOperator) {
                    while (true) {
                        ((Object)((Object)this)).wait();
                    }
                }
            }
        }
    }
}

