/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.runtime.leaderelection;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.curator.test.TestingServer;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class ZooKeeperLeaderElectionITCase
extends TestLogger {
    private static final Duration TEST_TIMEOUT = Duration.ofMinutes(5L);
    private static final Time RPC_TIMEOUT = Time.minutes((long)1L);
    private static TestingServer zkServer;
    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();

    @BeforeClass
    public static void setup() throws Exception {
        zkServer = new TestingServer(true);
    }

    @AfterClass
    public static void tearDown() throws Exception {
        if (zkServer != null) {
            zkServer.close();
            zkServer = null;
        }
    }

    @Test
    public void testJobExecutionOnClusterWithLeaderChange() throws Exception {
        int numDispatchers = 3;
        int numTMs = 2;
        int numSlotsPerTM = 2;
        Configuration configuration = ZooKeeperTestUtils.createZooKeeperHAConfig((String)zkServer.getConnectString(), (String)this.tempFolder.newFolder().getAbsolutePath());
        configuration.setLong(ClusterOptions.REFUSED_REGISTRATION_DELAY, 50L);
        TestingMiniClusterConfiguration miniClusterConfiguration = new TestingMiniClusterConfiguration.Builder().setConfiguration(configuration).setNumberDispatcherResourceManagerComponents(3).setNumTaskManagers(2).setNumSlotsPerTaskManager(2).build();
        Deadline timeout = Deadline.fromNow((Duration)TEST_TIMEOUT);
        try (TestingMiniCluster miniCluster = new TestingMiniCluster(miniClusterConfiguration);){
            miniCluster.start();
            int parallelism = 4;
            JobGraph jobGraph = this.createJobGraph(4);
            miniCluster.submitJob(jobGraph).get();
            String previousLeaderAddress = null;
            for (int i = 0; i < 2; ++i) {
                DispatcherGateway leaderDispatcherGateway = this.getNextLeadingDispatcherGateway(miniCluster, previousLeaderAddress, timeout);
                previousLeaderAddress = leaderDispatcherGateway.getAddress();
                CommonTestUtils.waitUntilCondition(() -> leaderDispatcherGateway.requestJobStatus(jobGraph.getJobID(), RPC_TIMEOUT).get() == JobStatus.RUNNING, (Deadline)timeout, (long)50L);
                leaderDispatcherGateway.shutDownCluster();
            }
            DispatcherGateway leaderDispatcherGateway = this.getNextLeadingDispatcherGateway(miniCluster, previousLeaderAddress, timeout);
            CommonTestUtils.waitUntilCondition(() -> leaderDispatcherGateway.requestJobStatus(jobGraph.getJobID(), RPC_TIMEOUT).get() == JobStatus.RUNNING, (Deadline)timeout, (long)50L);
            CompletableFuture jobResultFuture = leaderDispatcherGateway.requestJobResult(jobGraph.getJobID(), RPC_TIMEOUT);
            BlockingOperator.unblock();
            Assert.assertThat((Object)((JobResult)jobResultFuture.get()).isSuccess(), (Matcher)Matchers.is((Object)true));
        }
    }

    private DispatcherGateway getNextLeadingDispatcherGateway(TestingMiniCluster miniCluster, @Nullable String previousLeaderAddress, Deadline timeout) throws Exception {
        CommonTestUtils.waitUntilCondition(() -> !((DispatcherGateway)miniCluster.getDispatcherGatewayFuture().get()).getAddress().equals(previousLeaderAddress), (Deadline)timeout, (long)20L);
        return (DispatcherGateway)miniCluster.getDispatcherGatewayFuture().get();
    }

    private JobGraph createJobGraph(int parallelism) throws IOException {
        BlockingOperator.isBlocking = true;
        JobVertex vertex = new JobVertex("blocking operator");
        vertex.setParallelism(parallelism);
        vertex.setInvokableClass(BlockingOperator.class);
        JobGraph jobGraph = new JobGraph("Blocking test job", new JobVertex[]{vertex});
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)10, (long)Duration.ofSeconds(10L).toMillis()));
        jobGraph.setExecutionConfig(executionConfig);
        return jobGraph;
    }

    public static class BlockingOperator
    extends AbstractInvokable {
        private static final Object lock = new Object();
        private static volatile boolean isBlocking = true;

        public BlockingOperator(Environment environment) {
            super(environment);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke() throws Exception {
            Object object = lock;
            synchronized (object) {
                while (isBlocking) {
                    lock.wait();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public static void unblock() {
            Object object = lock;
            synchronized (object) {
                isBlocking = false;
                lock.notifyAll();
            }
        }
    }
}

