/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager;

import java.util.BitSet;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.types.IntValue;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

public class SlotCountExceedingParallelismTest {
    private static final int NUMBER_OF_TMS = 2;
    private static final int NUMBER_OF_SLOTS_PER_TM = 2;
    private static final int PARALLELISM = 4;
    private static TestingCluster flink;

    @BeforeClass
    public static void setUp() throws Exception {
        flink = TestingUtils.startTestingCluster(2, 2, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
    }

    @AfterClass
    public static void tearDown() throws Exception {
        if (flink != null) {
            flink.stop();
        }
    }

    @Test
    public void testNoSlotSharingAndBlockingResult() throws Exception {
        String jobName = "SlotCountExceedingParallelismTest (no slot sharing, blocking results)";
        JobGraph jobGraph = this.createTestJobGraph("SlotCountExceedingParallelismTest (no slot sharing, blocking results)", 8, 4);
        this.submitJobGraphAndWait(jobGraph);
        jobGraph = this.createTestJobGraph("SlotCountExceedingParallelismTest (no slot sharing, blocking results)", 4, 8);
        this.submitJobGraphAndWait(jobGraph);
        jobGraph = this.createTestJobGraph("SlotCountExceedingParallelismTest (no slot sharing, blocking results)", 8, 8);
        this.submitJobGraphAndWait(jobGraph);
    }

    private void submitJobGraphAndWait(JobGraph jobGraph) throws JobExecutionException {
        flink.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
    }

    private JobGraph createTestJobGraph(String jobName, int senderParallelism, int receiverParallelism) {
        JobVertex sender = new JobVertex("Sender");
        sender.setInvokableClass(RoundRobinSubtaskIndexSender.class);
        sender.getConfiguration().setInteger("number-of-times-to-send", receiverParallelism);
        sender.setParallelism(senderParallelism);
        JobVertex receiver = new JobVertex("Receiver");
        receiver.setInvokableClass(SubtaskIndexReceiver.class);
        receiver.getConfiguration().setInteger("number-of-indexes-to-receive", senderParallelism);
        receiver.setParallelism(receiverParallelism);
        receiver.connectNewDataSetAsInput(sender, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        JobGraph jobGraph = new JobGraph(jobName, new JobVertex[]{sender, receiver});
        jobGraph.setAllowQueuedScheduling(true);
        return jobGraph;
    }

    public static class SubtaskIndexReceiver
    extends AbstractInvokable {
        public static final String CONFIG_KEY = "number-of-indexes-to-receive";
        private RecordReader<IntValue> reader;
        private int numberOfSubtaskIndexesToReceive;
        private BitSet receivedSubtaskIndexes;

        public void registerInputOutput() {
            this.reader = new RecordReader(this.getEnvironment().getInputGate(0), IntValue.class);
            this.numberOfSubtaskIndexesToReceive = this.getTaskConfiguration().getInteger(CONFIG_KEY, 0);
            this.receivedSubtaskIndexes = new BitSet(this.numberOfSubtaskIndexesToReceive);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke() throws Exception {
            try {
                IntValue record;
                int numberOfReceivedSubtaskIndexes = 0;
                while ((record = (IntValue)this.reader.next()) != null) {
                    if (++numberOfReceivedSubtaskIndexes > this.numberOfSubtaskIndexesToReceive) {
                        throw new IllegalStateException("Received more records than expected.");
                    }
                    int subtaskIndex = record.getValue();
                    if (this.receivedSubtaskIndexes.get(subtaskIndex)) {
                        throw new IllegalStateException("Received expected subtask index twice.");
                    }
                    this.receivedSubtaskIndexes.set(subtaskIndex, true);
                }
                if (this.receivedSubtaskIndexes.cardinality() != this.numberOfSubtaskIndexesToReceive) {
                    throw new IllegalStateException("Finished receive, but did not receive all expected subtask indexes.");
                }
            }
            finally {
                this.reader.clearBuffers();
            }
        }
    }

    public static class RoundRobinSubtaskIndexSender
    extends AbstractInvokable {
        public static final String CONFIG_KEY = "number-of-times-to-send";
        private RecordWriter<IntValue> writer;
        private int numberOfTimesToSend;

        public void registerInputOutput() {
            this.writer = new RecordWriter(this.getEnvironment().getWriter(0));
            this.numberOfTimesToSend = this.getTaskConfiguration().getInteger(CONFIG_KEY, 0);
        }

        public void invoke() throws Exception {
            IntValue subtaskIndex = new IntValue(this.getEnvironment().getIndexInSubtaskGroup());
            try {
                for (int i = 0; i < this.numberOfTimesToSend; ++i) {
                    this.writer.emit((IOReadableWritable)subtaskIndex);
                }
                this.writer.flush();
            }
            finally {
                this.writer.clearBuffers();
            }
        }
    }
}

