/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager;

import java.util.Arrays;
import java.util.BitSet;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.TestLogger;
import org.junit.ClassRule;
import org.junit.Test;

public class SlotCountExceedingParallelismTest
extends TestLogger {
    private static final int NUMBER_OF_TMS = 2;
    private static final int NUMBER_OF_SLOTS_PER_TM = 2;
    private static final int PARALLELISM = 4;
    public static final String JOB_NAME = "SlotCountExceedingParallelismTest (no slot sharing, blocking results)";
    @ClassRule
    public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(SlotCountExceedingParallelismTest.getFlinkConfiguration()).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(2).build());

    private static Configuration getFlinkConfiguration() {
        Configuration config = new Configuration();
        config.set(RpcOptions.ASK_TIMEOUT_DURATION, (Object)TestingUtils.DEFAULT_ASK_TIMEOUT);
        return config;
    }

    @Test
    public void testNoSlotSharingAndBlockingResultSender() throws Exception {
        JobGraph jobGraph = this.createTestJobGraph(JOB_NAME, 8, 4);
        this.submitJobGraphAndWait(jobGraph);
    }

    @Test
    public void testNoSlotSharingAndBlockingResultReceiver() throws Exception {
        JobGraph jobGraph = this.createTestJobGraph(JOB_NAME, 4, 8);
        this.submitJobGraphAndWait(jobGraph);
    }

    @Test
    public void testNoSlotSharingAndBlockingResultBoth() throws Exception {
        JobGraph jobGraph = this.createTestJobGraph(JOB_NAME, 8, 8);
        this.submitJobGraphAndWait(jobGraph);
    }

    private void submitJobGraphAndWait(JobGraph jobGraph) throws JobExecutionException, InterruptedException {
        MINI_CLUSTER_RESOURCE.getMiniCluster().executeJobBlocking(jobGraph);
    }

    private JobGraph createTestJobGraph(String jobName, int senderParallelism, int receiverParallelism) {
        JobVertex sender = new JobVertex("Sender");
        sender.setInvokableClass(RoundRobinSubtaskIndexSender.class);
        sender.getConfiguration().get(ConfigurationUtils.getIntConfigOption((String)"number-of-times-to-send"), (Object)receiverParallelism);
        sender.setParallelism(senderParallelism);
        JobVertex receiver = new JobVertex("Receiver");
        receiver.setInvokableClass(SubtaskIndexReceiver.class);
        receiver.getConfiguration().get(ConfigurationUtils.getIntConfigOption((String)"number-of-indexes-to-receive"), (Object)senderParallelism);
        receiver.setParallelism(receiverParallelism);
        receiver.connectNewDataSetAsInput(sender, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        return JobGraphBuilder.newBatchJobGraphBuilder().setJobName(jobName).addJobVertices(Arrays.asList(sender, receiver)).build();
    }

    public static class SubtaskIndexReceiver
    extends AbstractInvokable {
        public static final String CONFIG_KEY = "number-of-indexes-to-receive";

        public SubtaskIndexReceiver(Environment environment) {
            super(environment);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke() throws Exception {
            RecordReader reader = new RecordReader((InputGate)this.getEnvironment().getInputGate(0), IntValue.class, this.getEnvironment().getTaskManagerInfo().getTmpDirectories());
            try {
                IntValue record;
                int numberOfSubtaskIndexesToReceive = (Integer)this.getTaskConfiguration().get(ConfigurationUtils.getIntConfigOption((String)CONFIG_KEY), (Object)0);
                BitSet receivedSubtaskIndexes = new BitSet(numberOfSubtaskIndexesToReceive);
                int numberOfReceivedSubtaskIndexes = 0;
                while ((record = (IntValue)reader.next()) != null) {
                    if (++numberOfReceivedSubtaskIndexes > numberOfSubtaskIndexesToReceive) {
                        throw new IllegalStateException("Received more records than expected.");
                    }
                    int subtaskIndex = record.getValue();
                    if (receivedSubtaskIndexes.get(subtaskIndex)) {
                        throw new IllegalStateException("Received expected subtask index twice.");
                    }
                    receivedSubtaskIndexes.set(subtaskIndex, true);
                }
                if (receivedSubtaskIndexes.cardinality() != numberOfSubtaskIndexesToReceive) {
                    throw new IllegalStateException("Finished receive, but did not receive all expected subtask indexes.");
                }
            }
            finally {
                reader.clearBuffers();
            }
        }
    }

    public static class RoundRobinSubtaskIndexSender
    extends AbstractInvokable {
        public static final String CONFIG_KEY = "number-of-times-to-send";

        public RoundRobinSubtaskIndexSender(Environment environment) {
            super(environment);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke() throws Exception {
            RecordWriter writer = new RecordWriterBuilder().build(this.getEnvironment().getWriter(0));
            int numberOfTimesToSend = (Integer)this.getTaskConfiguration().get(ConfigurationUtils.getIntConfigOption((String)CONFIG_KEY), (Object)0);
            IntValue subtaskIndex = new IntValue(this.getEnvironment().getTaskInfo().getIndexOfThisSubtask());
            try {
                for (int i = 0; i < numberOfTimesToSend; ++i) {
                    writer.emit((IOReadableWritable)subtaskIndex);
                }
                writer.flushAll();
            }
            finally {
                writer.close();
            }
        }
    }
}

