/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.shuffle;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.NettyShuffleServiceFactory;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.shuffle.JobShuffleContext;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.ProducerDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMasterContext;
import org.apache.flink.runtime.shuffle.ShuffleServiceOptions;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class ShuffleMasterTest {
    private static final String STOP_TRACKING_PARTITION_KEY = "stop_tracking_partition_key";
    private static final String PARTITION_REGISTRATION_EVENT = "registerPartitionWithProducer";
    private static final String EXTERNAL_PARTITION_RELEASE_EVENT = "releasePartitionExternally";

    ShuffleMasterTest() {
    }

    @BeforeEach
    void before() {
        TestShuffleMaster.partitionEvents.clear();
    }

    @Test
    void testShuffleMasterLifeCycle() throws Exception {
        try (MiniCluster cluster = new MiniCluster(this.createClusterConfiguration(false));){
            cluster.start();
            cluster.executeJobBlocking(this.createJobGraph());
        }
        Assertions.assertThat((AtomicBoolean)((TestShuffleMaster)((Object)TestShuffleMaster.currentInstance.get())).closed).isTrue();
        Assertions.assertThat((Collection)TestShuffleMaster.partitionEvents).containsExactly((Object[])new String[]{PARTITION_REGISTRATION_EVENT, PARTITION_REGISTRATION_EVENT, EXTERNAL_PARTITION_RELEASE_EVENT, EXTERNAL_PARTITION_RELEASE_EVENT});
    }

    @Test
    void testStopTrackingPartition() throws Exception {
        try (MiniCluster cluster = new MiniCluster(this.createClusterConfiguration(true));){
            cluster.start();
            cluster.executeJobBlocking(this.createJobGraph());
        }
        Assertions.assertThat((AtomicBoolean)((TestShuffleMaster)((Object)TestShuffleMaster.currentInstance.get())).closed).isTrue();
        Assertions.assertThat((Collection)TestShuffleMaster.partitionEvents).containsExactly((Object[])new String[]{PARTITION_REGISTRATION_EVENT, PARTITION_REGISTRATION_EVENT, PARTITION_REGISTRATION_EVENT, PARTITION_REGISTRATION_EVENT, EXTERNAL_PARTITION_RELEASE_EVENT, EXTERNAL_PARTITION_RELEASE_EVENT});
    }

    private MiniClusterConfiguration createClusterConfiguration(boolean stopTrackingPartition) {
        Configuration configuration = new Configuration();
        configuration.set(ShuffleServiceOptions.SHUFFLE_SERVICE_FACTORY_CLASS, (Object)TestShuffleServiceFactory.class.getName());
        configuration.setBoolean(STOP_TRACKING_PARTITION_KEY, stopTrackingPartition);
        return new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(1).setNumSlotsPerTaskManager(1).setConfiguration(configuration).build();
    }

    private JobGraph createJobGraph() throws Exception {
        JobVertex source = new JobVertex("source");
        source.setParallelism(2);
        source.setInvokableClass(NoOpInvokable.class);
        JobVertex sink = new JobVertex("sink");
        sink.setParallelism(2);
        sink.setInvokableClass(NoOpInvokable.class);
        sink.connectNewDataSetAsInput(source, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(source, sink);
        ExecutionConfig config = new ExecutionConfig();
        config.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)2, (Time)Time.seconds((long)2L)));
        jobGraph.setExecutionConfig(config);
        return jobGraph;
    }

    private static class TestShuffleMaster
    extends NettyShuffleMaster {
        private static final AtomicReference<TestShuffleMaster> currentInstance = new AtomicReference();
        private static final BlockingQueue<String> partitionEvents = new LinkedBlockingQueue<String>();
        private final AtomicBoolean started = new AtomicBoolean();
        private final AtomicBoolean closed = new AtomicBoolean();
        private final BlockingQueue<ResultPartitionID> partitions = new LinkedBlockingQueue<ResultPartitionID>();
        private final AtomicReference<JobShuffleContext> jobContext = new AtomicReference();
        private final boolean stopTrackingPartition;

        public TestShuffleMaster(Configuration conf) {
            super(conf);
            this.stopTrackingPartition = conf.getBoolean(ShuffleMasterTest.STOP_TRACKING_PARTITION_KEY, false);
            currentInstance.set(this);
        }

        public void start() throws Exception {
            Assertions.assertThat((AtomicBoolean)this.started).isFalse();
            Assertions.assertThat((AtomicBoolean)this.closed).isFalse();
            this.started.set(true);
            super.start();
        }

        public void close() throws Exception {
            this.assertShuffleMasterAlive();
            this.closed.set(true);
            super.close();
        }

        public void registerJob(JobShuffleContext context) {
            this.assertShuffleMasterAlive();
            Assertions.assertThat((boolean)this.jobContext.compareAndSet(null, context)).isTrue();
            super.registerJob(context);
        }

        public void unregisterJob(JobID jobID) {
            this.assertJobRegistered();
            this.jobContext.set(null);
            super.unregisterJob(jobID);
        }

        public CompletableFuture<NettyShuffleDescriptor> registerPartitionWithProducer(JobID jobID, PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) {
            this.assertJobRegistered();
            partitionEvents.add(ShuffleMasterTest.PARTITION_REGISTRATION_EVENT);
            CompletableFuture<NettyShuffleDescriptor> future = new CompletableFuture<NettyShuffleDescriptor>();
            try {
                NettyShuffleDescriptor shuffleDescriptor = (NettyShuffleDescriptor)super.registerPartitionWithProducer(jobID, partitionDescriptor, producerDescriptor).get();
                if (this.partitions.size() == 1 && this.stopTrackingPartition) {
                    this.jobContext.get().stopTrackingAndReleasePartitions(Collections.singletonList(this.partitions.peek())).thenRun(() -> future.completeExceptionally(new Exception("Test")));
                } else {
                    future.complete(shuffleDescriptor);
                }
                this.partitions.add(shuffleDescriptor.getResultPartitionID());
            }
            catch (Throwable throwable) {
                future.completeExceptionally(throwable);
            }
            return future;
        }

        public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) {
            this.assertJobRegistered();
            partitionEvents.add(ShuffleMasterTest.EXTERNAL_PARTITION_RELEASE_EVENT);
            super.releasePartitionExternally(shuffleDescriptor);
        }

        private void assertShuffleMasterAlive() {
            Assertions.assertThat((AtomicBoolean)this.closed).isFalse();
            Assertions.assertThat((AtomicBoolean)this.started).isTrue();
        }

        private void assertJobRegistered() {
            this.assertShuffleMasterAlive();
            Assertions.assertThat(this.jobContext).isNotNull();
        }
    }

    public static class TestShuffleServiceFactory
    extends NettyShuffleServiceFactory {
        public NettyShuffleMaster createShuffleMaster(ShuffleMasterContext shuffleMasterContext) {
            return new TestShuffleMaster(shuffleMasterContext.getConfiguration());
        }
    }
}

