/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.leaderelection;

import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.jobmanager.Tasks$BlockingOnceReceiver$;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.leaderelection.LeaderElectionRetrievalTestingCluster;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class LeaderChangeStateCleanupTest
extends TestLogger {
    private static FiniteDuration timeout = TestingUtils.TESTING_DURATION();
    private int numJMs = 2;
    private int numTMs = 2;
    private int numSlotsPerTM = 2;
    private int parallelism = this.numTMs * this.numSlotsPerTM;
    private Configuration configuration;
    private LeaderElectionRetrievalTestingCluster cluster = null;
    private JobGraph job = this.createBlockingJob(this.parallelism);

    @Before
    public void before() throws Exception {
        Tasks$BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
        this.configuration = new Configuration();
        this.configuration.setInteger("local.number-jobmanager", this.numJMs);
        this.configuration.setInteger("local.number-taskmanager", this.numTMs);
        this.configuration.setInteger("taskmanager.numberOfTaskSlots", this.numSlotsPerTM);
        this.cluster = new LeaderElectionRetrievalTestingCluster(this.configuration, true, false, StreamingMode.BATCH_ONLY);
        this.cluster.start(false);
        this.cluster.waitForActorsToBeAlive();
    }

    @After
    public void after() {
        if (this.cluster != null) {
            this.cluster.stop();
        }
    }

    @Test
    public void testStateCleanupAfterNewLeaderElectionAndListenerNotification() throws Exception {
        UUID leaderSessionID1 = UUID.randomUUID();
        UUID leaderSessionID2 = UUID.randomUUID();
        this.cluster.grantLeadership(0, leaderSessionID1);
        this.cluster.notifyRetrievalListeners(0, leaderSessionID1);
        this.cluster.waitForTaskManagersToBeRegistered();
        this.cluster.submitJobDetached(this.job);
        ActorGateway jm = this.cluster.getLeaderGateway(timeout);
        Future wait = jm.ask((Object)new TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished(this.job.getJobID()), timeout);
        Await.ready((Awaitable)wait, (Duration)timeout);
        Future jobRemoval = jm.ask((Object)new TestingJobManagerMessages.NotifyWhenJobRemoved(this.job.getJobID()), timeout);
        this.cluster.grantLeadership(1, leaderSessionID2);
        this.cluster.notifyRetrievalListeners(1, leaderSessionID2);
        Await.ready((Awaitable)jobRemoval, (Duration)timeout);
        this.cluster.waitForTaskManagersToBeRegistered();
        ActorGateway jm2 = this.cluster.getLeaderGateway(timeout);
        Future futureNumberSlots = jm2.ask(JobManagerMessages.getRequestTotalNumberOfSlots(), timeout);
        int numberSlots = (Integer)Await.result((Awaitable)futureNumberSlots, (Duration)timeout);
        Assert.assertEquals((long)this.parallelism, (long)numberSlots);
        Tasks$BlockingOnceReceiver$.MODULE$.blocking_$eq(false);
        this.cluster.submitJobAndWait(this.job, false, timeout, new TestingLeaderRetrievalService(jm2.path(), jm2.leaderSessionID()));
    }

    @Test
    public void testStateCleanupAfterNewLeaderElection() throws Exception {
        UUID leaderSessionID = UUID.randomUUID();
        UUID newLeaderSessionID = UUID.randomUUID();
        this.cluster.grantLeadership(0, leaderSessionID);
        this.cluster.notifyRetrievalListeners(0, leaderSessionID);
        this.cluster.waitForTaskManagersToBeRegistered();
        this.cluster.submitJobDetached(this.job);
        ActorGateway jm = this.cluster.getLeaderGateway(timeout);
        Future wait = jm.ask((Object)new TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished(this.job.getJobID()), timeout);
        Await.ready((Awaitable)wait, (Duration)timeout);
        Future jobRemoval = jm.ask((Object)new TestingJobManagerMessages.NotifyWhenJobRemoved(this.job.getJobID()), timeout);
        this.cluster.grantLeadership(1, newLeaderSessionID);
        Await.ready((Awaitable)jobRemoval, (Duration)timeout);
    }

    @Test
    public void testStateCleanupAfterListenerNotification() throws Exception {
        UUID leaderSessionID = UUID.randomUUID();
        UUID newLeaderSessionID = UUID.randomUUID();
        this.cluster.grantLeadership(0, leaderSessionID);
        this.cluster.notifyRetrievalListeners(0, leaderSessionID);
        this.cluster.waitForTaskManagersToBeRegistered();
        this.cluster.submitJobDetached(this.job);
        ActorGateway jm = this.cluster.getLeaderGateway(timeout);
        Future wait = jm.ask((Object)new TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished(this.job.getJobID()), timeout);
        Await.ready((Awaitable)wait, (Duration)timeout);
        Future jobRemoval = jm.ask((Object)new TestingJobManagerMessages.NotifyWhenJobRemoved(this.job.getJobID()), timeout);
        this.cluster.notifyRetrievalListeners(1, newLeaderSessionID);
        Await.ready((Awaitable)jobRemoval, (Duration)timeout);
    }

    @Test
    public void testReelectionOfSameJobManager() throws Exception {
        UUID leaderSessionID = UUID.randomUUID();
        UUID newLeaderSessionID = UUID.randomUUID();
        FiniteDuration shortTimeout = new FiniteDuration(10L, TimeUnit.SECONDS);
        this.cluster.grantLeadership(0, leaderSessionID);
        this.cluster.notifyRetrievalListeners(0, leaderSessionID);
        this.cluster.waitForTaskManagersToBeRegistered();
        this.cluster.submitJobDetached(this.job);
        ActorGateway jm = this.cluster.getLeaderGateway(timeout);
        Future wait = jm.ask((Object)new TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished(this.job.getJobID()), timeout);
        Await.ready((Awaitable)wait, (Duration)timeout);
        Future jobRemoval = jm.ask((Object)new TestingJobManagerMessages.NotifyWhenJobRemoved(this.job.getJobID()), timeout);
        this.cluster.grantLeadership(0, newLeaderSessionID);
        Await.ready((Awaitable)jobRemoval, (Duration)timeout);
        try {
            this.cluster.waitForTaskManagersToBeRegistered(shortTimeout);
            Assert.fail((String)"TaskManager should not be able to register at JobManager.");
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        this.cluster.notifyRetrievalListeners(0, newLeaderSessionID);
        this.cluster.waitForTaskManagersToBeRegistered();
        ActorGateway leaderGateway = this.cluster.getLeaderGateway(timeout);
        Tasks$BlockingOnceReceiver$.MODULE$.blocking_$eq(false);
        this.cluster.submitJobAndWait(this.job, false, timeout, new TestingLeaderRetrievalService(leaderGateway.path(), leaderGateway.leaderSessionID()));
    }

    public JobGraph createBlockingJob(int parallelism) {
        Tasks$BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
        JobVertex sender = new JobVertex("sender");
        JobVertex receiver = new JobVertex("receiver");
        sender.setInvokableClass(Tasks.Sender.class);
        receiver.setInvokableClass(Tasks.BlockingOnceReceiver.class);
        sender.setParallelism(parallelism);
        receiver.setParallelism(parallelism);
        receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        sender.setSlotSharingGroup(slotSharingGroup);
        receiver.setSlotSharingGroup(slotSharingGroup);
        return new JobGraph("Blocking test job", new JobVertex[]{sender, receiver});
    }
}

