/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.IndexRange;
import org.apache.flink.runtime.io.PullingAsyncDataInput;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.TestingConnectionManager;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition;
import org.apache.flink.runtime.io.network.partition.ChannelStateHolder;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.InputGateFairnessTest;
import org.apache.flink.runtime.io.network.partition.NoOpResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputGateID;
import org.apache.flink.runtime.io.network.partition.consumer.InputGateTestBase;
import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.LocalRecoveredInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteRecoveredInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory;
import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.UnknownInputChannel;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.io.network.util.TestTaskEvent;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
import org.apache.flink.shaded.guava31.com.google.common.io.Closer;
import org.apache.flink.util.CompressedSerializedValue;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

class SingleInputGateTest
extends InputGateTestBase {
    SingleInputGateTest() {
    }

    @Test
    void testCheckpointsDeclinedUnlessAllChannelsAreKnown() {
        SingleInputGate gate = this.createInputGate(this.createNettyShuffleEnvironment(), 1, ResultPartitionType.PIPELINED);
        gate.setInputChannels(new InputChannel[]{new InputChannelBuilder().setChannelIndex(0).buildUnknownChannel(gate)});
        Assertions.assertThatThrownBy(() -> gate.checkpointStarted(new CheckpointBarrier(1L, 1L, CheckpointOptions.alignedNoTimeout((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault())))).isInstanceOf(CheckpointException.class);
    }

    @Test
    void testCheckpointsDeclinedUnlessStateConsumed() {
        SingleInputGate gate = this.createInputGate(this.createNettyShuffleEnvironment());
        Preconditions.checkState((!gate.getStateConsumedFuture().isDone() ? 1 : 0) != 0);
        Assertions.assertThatThrownBy(() -> gate.checkpointStarted(new CheckpointBarrier(1L, 1L, CheckpointOptions.alignedNoTimeout((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault())))).isInstanceOf(CheckpointException.class);
    }

    @Test
    void testSetupLogic() throws Exception {
        NettyShuffleEnvironment environment = this.createNettyShuffleEnvironment();
        SingleInputGate inputGate = this.createInputGate(environment);
        try (Closer closer = Closer.create();){
            closer.register(() -> ((NettyShuffleEnvironment)environment).close());
            closer.register(() -> ((SingleInputGate)inputGate).close());
            Assertions.assertThat((Object)inputGate.getBufferPool()).isNull();
            for (InputChannel inputChannel : inputGate.inputChannels()) {
                Assertions.assertThat((inputChannel instanceof RecoveredInputChannel || inputChannel instanceof UnknownInputChannel ? 1 : 0) != 0).isTrue();
                if (!(inputChannel instanceof RecoveredInputChannel)) continue;
                Assertions.assertThat((int)((RecoveredInputChannel)inputChannel).bufferManager.getNumberOfAvailableBuffers()).isEqualTo(0);
            }
            inputGate.setup();
            Assertions.assertThat((Object)inputGate.getBufferPool()).isNotNull();
            Assertions.assertThat((int)inputGate.getBufferPool().getNumberOfRequiredMemorySegments()).isEqualTo(1);
            for (InputChannel inputChannel : inputGate.inputChannels()) {
                if (inputChannel instanceof RemoteRecoveredInputChannel) {
                    Assertions.assertThat((int)((RemoteRecoveredInputChannel)inputChannel).bufferManager.getNumberOfAvailableBuffers()).isEqualTo(0);
                    continue;
                }
                if (!(inputChannel instanceof LocalRecoveredInputChannel)) continue;
                Assertions.assertThat((int)((LocalRecoveredInputChannel)inputChannel).bufferManager.getNumberOfAvailableBuffers()).isEqualTo(0);
            }
            inputGate.convertRecoveredInputChannels();
            Assertions.assertThat((Object)inputGate.getBufferPool()).isNotNull();
            Assertions.assertThat((int)inputGate.getBufferPool().getNumberOfRequiredMemorySegments()).isEqualTo(1);
            for (InputChannel inputChannel : inputGate.inputChannels()) {
                if (!(inputChannel instanceof RemoteInputChannel)) continue;
                Assertions.assertThat((int)((RemoteInputChannel)inputChannel).getNumberOfAvailableBuffers()).isEqualTo(2);
            }
        }
    }

    @Test
    void testPartitionRequestLogic() throws Exception {
        NettyShuffleEnvironment environment = new NettyShuffleEnvironmentBuilder().build();
        SingleInputGate gate = this.createInputGate(environment);
        try (Closer closer = Closer.create();){
            closer.register(() -> ((NettyShuffleEnvironment)environment).close());
            closer.register(() -> ((SingleInputGate)gate).close());
            gate.finishReadRecoveredState();
            while (!gate.getStateConsumedFuture().isDone()) {
                gate.pollNext();
            }
            gate.requestPartitions();
            gate.pollNext();
            InputChannel remoteChannel = gate.getChannel(0);
            Assertions.assertThat((Object)remoteChannel).isInstanceOf(RemoteInputChannel.class);
            Assertions.assertThat((Object)((RemoteInputChannel)remoteChannel).getPartitionRequestClient()).isNotNull();
            Assertions.assertThat((int)((RemoteInputChannel)remoteChannel).getInitialCredit()).isEqualTo(2);
            InputChannel localChannel = gate.getChannel(1);
            Assertions.assertThat((Object)localChannel).isInstanceOf(LocalInputChannel.class);
            Assertions.assertThat((Object)((LocalInputChannel)localChannel).getSubpartitionView()).isNotNull();
            Assertions.assertThat((Object)gate.getChannel(2)).isInstanceOf(UnknownInputChannel.class);
        }
    }

    @Test
    void testBasicGetNextLogic() throws Exception {
        SingleInputGate inputGate = this.createInputGate();
        InputChannel[] inputChannels = new TestInputChannel[]{new TestInputChannel(inputGate, 0), new TestInputChannel(inputGate, 1)};
        inputGate.setInputChannels(inputChannels);
        inputChannels[0].readBuffer();
        inputChannels[0].readBuffer();
        inputChannels[1].readBuffer();
        inputChannels[1].readEndOfData();
        inputChannels[0].readEndOfData();
        inputChannels[1].readEndOfPartitionEvent();
        inputChannels[0].readEndOfPartitionEvent();
        inputGate.notifyChannelNonEmpty(inputChannels[0]);
        inputGate.notifyChannelNonEmpty(inputChannels[1]);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, true, 0, true);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, true, 1, true);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, true, 0, true);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, false, 1, true);
        Assertions.assertThat((Comparable)inputGate.hasReceivedEndOfData()).isEqualTo((Object)PullingAsyncDataInput.EndOfDataStatus.NOT_END_OF_DATA);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, false, 0, true);
        Assertions.assertThat((boolean)inputGate.isFinished()).isFalse();
        Assertions.assertThat((Comparable)inputGate.hasReceivedEndOfData()).isEqualTo((Object)PullingAsyncDataInput.EndOfDataStatus.DRAINED);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, false, 1, true);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, false, 0, false);
        Assertions.assertThat((Comparable)inputGate.hasReceivedEndOfData()).isEqualTo((Object)PullingAsyncDataInput.EndOfDataStatus.DRAINED);
        Assertions.assertThat((boolean)inputGate.isFinished()).isTrue();
        for (InputChannel ic : inputChannels) {
            ic.assertReturnedEventsAreRecycled();
        }
    }

    @Test
    void testDrainFlagComputation() throws Exception {
        SingleInputGate inputGate1 = this.createInputGate();
        SingleInputGate inputGate2 = this.createInputGate();
        InputChannel[] inputChannels1 = new TestInputChannel[]{new TestInputChannel(inputGate1, 0), new TestInputChannel(inputGate1, 1)};
        inputGate1.setInputChannels(inputChannels1);
        InputChannel[] inputChannels2 = new TestInputChannel[]{new TestInputChannel(inputGate2, 0), new TestInputChannel(inputGate2, 1)};
        inputGate2.setInputChannels(inputChannels2);
        inputChannels1[1].readEndOfData(StopMode.DRAIN);
        inputChannels1[0].readEndOfData(StopMode.NO_DRAIN);
        inputChannels2[1].readEndOfData(StopMode.DRAIN);
        inputChannels2[0].readEndOfData(StopMode.DRAIN);
        inputGate1.notifyChannelNonEmpty(inputChannels1[0]);
        inputGate1.notifyChannelNonEmpty(inputChannels1[1]);
        inputGate2.notifyChannelNonEmpty(inputChannels2[0]);
        inputGate2.notifyChannelNonEmpty(inputChannels2[1]);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate1, false, 0, true);
        Assertions.assertThat((Comparable)inputGate1.hasReceivedEndOfData()).isEqualTo((Object)PullingAsyncDataInput.EndOfDataStatus.NOT_END_OF_DATA);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate1, false, 1, true);
        Assertions.assertThat((Comparable)inputGate1.hasReceivedEndOfData()).isEqualTo((Object)PullingAsyncDataInput.EndOfDataStatus.STOPPED);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate2, false, 0, true);
        Assertions.assertThat((Comparable)inputGate2.hasReceivedEndOfData()).isEqualTo((Object)PullingAsyncDataInput.EndOfDataStatus.NOT_END_OF_DATA);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate2, false, 1, true);
        Assertions.assertThat((Comparable)inputGate2.hasReceivedEndOfData()).isEqualTo((Object)PullingAsyncDataInput.EndOfDataStatus.DRAINED);
    }

    @ParameterizedTest
    @ValueSource(strings={"LZ4", "LZO", "ZSTD"})
    void testGetCompressedBuffer(String compressionCodec) throws Exception {
        int bufferSize = 1024;
        BufferCompressor compressor = new BufferCompressor(bufferSize, compressionCodec);
        BufferDecompressor decompressor = new BufferDecompressor(bufferSize, compressionCodec);
        try (SingleInputGate inputGate = new SingleInputGateBuilder().setBufferDecompressor(decompressor).build();){
            TestInputChannel inputChannel = new TestInputChannel(inputGate, 0);
            MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment((int)bufferSize);
            for (int i = 0; i < bufferSize; i += 8) {
                segment.putLongLittleEndian(i, (long)i);
            }
            NetworkBuffer uncompressedBuffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE);
            uncompressedBuffer.setSize(bufferSize);
            Buffer compressedBuffer = compressor.compressToOriginalBuffer((Buffer)uncompressedBuffer);
            Assertions.assertThat((boolean)compressedBuffer.isCompressed()).isTrue();
            inputChannel.read(compressedBuffer);
            inputGate.setInputChannels(new InputChannel[]{inputChannel});
            inputGate.notifyChannelNonEmpty((InputChannel)inputChannel);
            Optional bufferOrEvent = inputGate.getNext();
            Assertions.assertThat((boolean)bufferOrEvent.isPresent()).isTrue();
            Assertions.assertThat((boolean)((BufferOrEvent)bufferOrEvent.get()).isBuffer()).isTrue();
            ByteBuffer buffer = ((BufferOrEvent)bufferOrEvent.get()).getBuffer().getNioBufferReadable().order(ByteOrder.LITTLE_ENDIAN);
            for (int i = 0; i < bufferSize; i += 8) {
                Assertions.assertThat((long)buffer.getLong()).isEqualTo((long)i);
            }
        }
    }

    @Test
    void testNotifyAfterEndOfPartition() throws Exception {
        SingleInputGate inputGate = this.createInputGate(2);
        TestInputChannel inputChannel = new TestInputChannel(inputGate, 0);
        inputGate.setInputChannels(new InputChannel[]{inputChannel, new TestInputChannel(inputGate, 1)});
        inputChannel.readEndOfPartitionEvent();
        inputChannel.notifyChannelNonEmpty();
        Assertions.assertThat((Object)((BufferOrEvent)inputGate.pollNext().get()).getEvent()).isEqualTo((Object)EndOfPartitionEvent.INSTANCE);
        inputChannel.notifyChannelNonEmpty();
        Assertions.assertThat((boolean)inputGate.pollNext().isPresent()).isFalse();
    }

    @Test
    void testIsAvailable() throws Exception {
        SingleInputGate inputGate = this.createInputGate(1);
        TestInputChannel inputChannel = new TestInputChannel(inputGate, 0);
        inputGate.setInputChannels(new InputChannel[]{inputChannel});
        this.testIsAvailable((InputGate)inputGate, inputGate, inputChannel);
    }

    @Test
    void testIsAvailableAfterFinished() throws Exception {
        SingleInputGate inputGate = this.createInputGate(1);
        TestInputChannel inputChannel = new TestInputChannel(inputGate, 0);
        inputGate.setInputChannels(new InputChannel[]{inputChannel});
        this.testIsAvailableAfterFinished((InputGate)inputGate, () -> {
            inputChannel.readEndOfPartitionEvent();
            inputGate.notifyChannelNonEmpty((InputChannel)inputChannel);
        });
    }

    @Test
    void testIsMoreAvailableReadingFromSingleInputChannel() throws Exception {
        SingleInputGate inputGate = this.createInputGate();
        InputChannel[] inputChannels = new TestInputChannel[]{new TestInputChannel(inputGate, 0), new TestInputChannel(inputGate, 1)};
        inputGate.setInputChannels(inputChannels);
        inputChannels[0].readBuffer();
        inputChannels[0].readEndOfPartitionEvent();
        inputGate.notifyChannelNonEmpty(inputChannels[0]);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, true, 0, true);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, false, 0, false);
    }

    @Test
    void testBackwardsEventWithUninitializedChannel() throws Exception {
        TestingTaskEventPublisher taskEventPublisher = new TestingTaskEventPublisher();
        TestingResultPartitionManager partitionManager = new TestingResultPartitionManager((ResultSubpartitionView)new NoOpResultSubpartitionView());
        NettyShuffleEnvironment environment = this.createNettyShuffleEnvironment();
        SingleInputGate inputGate = this.createInputGate(environment, 2, ResultPartitionType.PIPELINED);
        InputChannel[] inputChannels = new InputChannel[2];
        try (Closer closer = Closer.create();){
            closer.register(() -> ((NettyShuffleEnvironment)environment).close());
            closer.register(() -> ((SingleInputGate)inputGate).close());
            ResultPartitionID localPartitionId = new ResultPartitionID();
            inputChannels[0] = InputChannelBuilder.newBuilder().setPartitionId(localPartitionId).setPartitionManager(partitionManager).setTaskEventPublisher(taskEventPublisher).buildLocalChannel(inputGate);
            ResultPartitionID unknownPartitionId = new ResultPartitionID();
            inputChannels[1] = InputChannelBuilder.newBuilder().setChannelIndex(1).setPartitionId(unknownPartitionId).setPartitionManager(partitionManager).setTaskEventPublisher(taskEventPublisher).buildUnknownChannel(inputGate);
            InputGateFairnessTest.setupInputGate(inputGate, inputChannels);
            Assertions.assertThat((int)partitionManager.counter).isEqualTo(1);
            TestTaskEvent event = new TestTaskEvent();
            inputGate.sendTaskEvent((TaskEvent)event);
            Assertions.assertThat((int)taskEventPublisher.counter).isEqualTo(1);
            ResourceID location = ResourceID.generate();
            inputGate.updateInputChannel(location, NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(unknownPartitionId.getPartitionId(), location));
            Assertions.assertThat((int)partitionManager.counter).isEqualTo(2);
            Assertions.assertThat((int)taskEventPublisher.counter).isEqualTo(2);
        }
    }

    @Test
    void testUpdateChannelBeforeRequest() throws Exception {
        SingleInputGate inputGate = this.createInputGate(1);
        TestingResultPartitionManager partitionManager = new TestingResultPartitionManager((ResultSubpartitionView)new NoOpResultSubpartitionView());
        UnknownInputChannel unknown = InputChannelBuilder.newBuilder().setPartitionManager(partitionManager).buildUnknownChannel(inputGate);
        inputGate.setInputChannels(new InputChannel[]{unknown});
        ResultPartitionID resultPartitionID = unknown.getPartitionId();
        ResourceID location = ResourceID.generate();
        inputGate.updateInputChannel(location, NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(resultPartitionID.getPartitionId(), location));
        Assertions.assertThat((int)partitionManager.counter).isEqualTo(0);
    }

    @Test
    void testUpdateLocalInputChannelWithNewPartitionId() throws Exception {
        SingleInputGate inputGate = this.createInputGate(1);
        TestingResultPartitionManager partitionManager = new TestingResultPartitionManager((ResultSubpartitionView)new NoOpResultSubpartitionView());
        ResultPartitionID oldPartitionId = new ResultPartitionID();
        UnknownInputChannel unknown = InputChannelBuilder.newBuilder().setPartitionManager(partitionManager).setPartitionId(oldPartitionId).buildUnknownChannel(inputGate);
        inputGate.setInputChannels(new InputChannel[]{unknown});
        ResultPartitionID resultPartitionID = unknown.getPartitionId();
        Assertions.assertThat((Object)resultPartitionID).isEqualTo((Object)oldPartitionId);
        ResultPartitionID newPartitionId = new ResultPartitionID(oldPartitionId.getPartitionId(), ExecutionAttemptID.randomId());
        ResourceID location = ResourceID.generate();
        NettyShuffleDescriptor nettyShuffleDescriptor = NettyShuffleDescriptorBuilder.newBuilder().setId(newPartitionId).setProducerLocation(location).buildLocal();
        inputGate.updateInputChannel(location, nettyShuffleDescriptor);
        InputChannel newChannel = inputGate.getChannel(0);
        Assertions.assertThat((Object)newChannel).isInstanceOf(LocalInputChannel.class);
        Assertions.assertThat((Object)newChannel.partitionId).isEqualTo((Object)newPartitionId);
    }

    @Test
    void testReleaseWhilePollingChannel() throws Exception {
        final AtomicReference asyncException = new AtomicReference();
        final SingleInputGate inputGate = this.createInputGate(1);
        UnknownInputChannel inputChannel = InputChannelBuilder.newBuilder().buildUnknownChannel(inputGate);
        inputGate.setInputChannels(new InputChannel[]{inputChannel});
        Thread asyncConsumer = new Thread(){

            @Override
            public void run() {
                try {
                    inputGate.getNext();
                }
                catch (Exception e) {
                    asyncException.set(e);
                }
            }
        };
        asyncConsumer.start();
        boolean success = false;
        for (int i = 0; i < 50; ++i) {
            if (asyncConsumer.isAlive()) {
                boolean bl = success = asyncConsumer.getState() == Thread.State.WAITING;
            }
            if (success) break;
            Thread.sleep(100L);
        }
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)success).as("Did not trigger blocking buffer request.", new Object[0])).isTrue();
        inputGate.close();
        asyncConsumer.join();
        Assertions.assertThat((Throwable)((Throwable)asyncException.get())).isNotNull();
        Assertions.assertThat(((Exception)asyncException.get()).getClass()).isEqualTo(IllegalStateException.class);
    }

    @Test
    void testRequestBackoffConfiguration() throws Exception {
        IntermediateResultPartitionID[] partitionIds = new IntermediateResultPartitionID[]{new IntermediateResultPartitionID(), new IntermediateResultPartitionID(), new IntermediateResultPartitionID()};
        int initialBackoff = 137;
        int partitionRequestTimeout = 600;
        int maxBackoff = 1001;
        NettyShuffleEnvironment netEnv = new NettyShuffleEnvironmentBuilder().setPartitionRequestInitialBackoff(initialBackoff).setPartitionRequestTimeout(partitionRequestTimeout).setPartitionRequestMaxBackoff(maxBackoff).build();
        SingleInputGate gate = SingleInputGateTest.createSingleInputGate(partitionIds, ResultPartitionType.PIPELINED, netEnv);
        gate.setChannelStateWriter(ChannelStateWriter.NO_OP);
        gate.finishReadRecoveredState();
        while (!gate.getStateConsumedFuture().isDone()) {
            gate.pollNext();
        }
        gate.convertRecoveredInputChannels();
        try (Closer closer = Closer.create();){
            InputChannel[] channels;
            closer.register(() -> ((NettyShuffleEnvironment)netEnv).close());
            closer.register(() -> ((SingleInputGate)gate).close());
            Assertions.assertThat((Comparable)gate.getConsumedPartitionType()).isEqualTo((Object)ResultPartitionType.PIPELINED);
            Map channelMap = gate.getInputChannels();
            Assertions.assertThat((int)channelMap.size()).isEqualTo(3);
            channelMap.values().forEach(channel -> {
                try {
                    channel.checkError();
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
            InputChannel localChannel = SingleInputGateTest.getTheOnlyInputChannelInPartition(gate, partitionIds[0]);
            Assertions.assertThat(localChannel.getClass()).isEqualTo(LocalInputChannel.class);
            InputChannel unknownChannel = SingleInputGateTest.getTheOnlyInputChannelInPartition(gate, partitionIds[2]);
            Assertions.assertThat(unknownChannel.getClass()).isEqualTo(UnknownInputChannel.class);
            for (InputChannel ch : channels = new InputChannel[]{localChannel, unknownChannel}) {
                Assertions.assertThat((int)ch.getCurrentBackoff()).isEqualTo(0);
                Assertions.assertThat((boolean)ch.increaseBackoff()).isTrue();
                Assertions.assertThat((int)ch.getCurrentBackoff()).isEqualTo(initialBackoff);
                Assertions.assertThat((boolean)ch.increaseBackoff()).isTrue();
                Assertions.assertThat((int)ch.getCurrentBackoff()).isEqualTo(initialBackoff * 2);
                Assertions.assertThat((boolean)ch.increaseBackoff()).isTrue();
                Assertions.assertThat((int)ch.getCurrentBackoff()).isEqualTo(initialBackoff * 2 * 2);
                Assertions.assertThat((boolean)ch.increaseBackoff()).isTrue();
                Assertions.assertThat((int)ch.getCurrentBackoff()).isEqualTo(maxBackoff);
                Assertions.assertThat((boolean)ch.increaseBackoff()).isFalse();
            }
            InputChannel remoteChannel = SingleInputGateTest.getTheOnlyInputChannelInPartition(gate, partitionIds[1]);
            Assertions.assertThat(remoteChannel.getClass()).isEqualTo(RemoteInputChannel.class);
            Assertions.assertThat((int)remoteChannel.getCurrentBackoff()).isEqualTo(0);
            Assertions.assertThat((boolean)remoteChannel.increaseBackoff()).isTrue();
            Assertions.assertThat((int)remoteChannel.getCurrentBackoff()).isEqualTo(partitionRequestTimeout);
            Assertions.assertThat((boolean)remoteChannel.increaseBackoff()).isTrue();
            Assertions.assertThat((int)remoteChannel.getCurrentBackoff()).isEqualTo(partitionRequestTimeout * 2);
            Assertions.assertThat((boolean)remoteChannel.increaseBackoff()).isTrue();
            Assertions.assertThat((int)remoteChannel.getCurrentBackoff()).isEqualTo(partitionRequestTimeout * 3);
            Assertions.assertThat((boolean)remoteChannel.increaseBackoff()).isFalse();
        }
    }

    @Test
    void testRequestBuffersWithRemoteInputChannel() throws Exception {
        NettyShuffleEnvironment network = this.createNettyShuffleEnvironment();
        SingleInputGate inputGate = this.createInputGate(network, 1, ResultPartitionType.PIPELINED_BOUNDED);
        int buffersPerChannel = 2;
        int extraNetworkBuffersPerGate = 8;
        try (Closer closer = Closer.create();){
            closer.register(() -> ((NettyShuffleEnvironment)network).close());
            closer.register(() -> ((SingleInputGate)inputGate).close());
            RemoteInputChannel remote = InputChannelBuilder.newBuilder().setupFromNettyShuffleEnvironment(network).setConnectionManager(new TestingConnectionManager()).buildRemoteChannel(inputGate);
            inputGate.setInputChannels(new InputChannel[]{remote});
            inputGate.setup();
            NetworkBufferPool bufferPool = network.getNetworkBufferPool();
            Assertions.assertThat((int)remote.getNumberOfAvailableBuffers()).isEqualTo(buffersPerChannel);
            Assertions.assertThat((int)bufferPool.getNumberOfAvailableMemorySegments()).isEqualTo(bufferPool.getTotalNumberOfMemorySegments() - buffersPerChannel - 1);
            Assertions.assertThat((int)bufferPool.countBuffers()).isEqualTo(extraNetworkBuffersPerGate);
        }
    }

    @Test
    void testRequestBuffersWithUnknownInputChannel() throws Exception {
        NettyShuffleEnvironment network = this.createNettyShuffleEnvironment();
        SingleInputGate inputGate = this.createInputGate(network, 1, ResultPartitionType.PIPELINED_BOUNDED);
        int buffersPerChannel = 2;
        int extraNetworkBuffersPerGate = 8;
        try (Closer closer = Closer.create();){
            closer.register(() -> ((NettyShuffleEnvironment)network).close());
            closer.register(() -> ((SingleInputGate)inputGate).close());
            ResultPartitionID resultPartitionId = new ResultPartitionID();
            InputChannel inputChannel = this.buildUnknownInputChannel(network, inputGate, resultPartitionId, 0);
            inputGate.setInputChannels(new InputChannel[]{inputChannel});
            inputGate.setup();
            NetworkBufferPool bufferPool = network.getNetworkBufferPool();
            Assertions.assertThat((int)bufferPool.getNumberOfAvailableMemorySegments()).isEqualTo(bufferPool.getTotalNumberOfMemorySegments() - 1);
            Assertions.assertThat((int)bufferPool.countBuffers()).isEqualTo(extraNetworkBuffersPerGate);
            inputGate.updateInputChannel(ResourceID.generate(), NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(resultPartitionId.getPartitionId(), ResourceID.generate()));
            RemoteInputChannel remote = (RemoteInputChannel)SingleInputGateTest.getTheOnlyInputChannelInPartition(inputGate, resultPartitionId);
            Assertions.assertThat((int)remote.getNumberOfAvailableBuffers()).isEqualTo(buffersPerChannel);
            Assertions.assertThat((int)bufferPool.getNumberOfAvailableMemorySegments()).isEqualTo(bufferPool.getTotalNumberOfMemorySegments() - buffersPerChannel - 1);
            Assertions.assertThat((int)bufferPool.countBuffers()).isEqualTo(extraNetworkBuffersPerGate);
        }
    }

    @Test
    void testUpdateUnknownInputChannel() throws Exception {
        NettyShuffleEnvironment network = this.createNettyShuffleEnvironment();
        ResultPartition localResultPartition = new ResultPartitionBuilder().setResultPartitionManager(network.getResultPartitionManager()).setupBufferPoolFactoryFromNettyShuffleEnvironment(network).build();
        ResultPartition remoteResultPartition = new ResultPartitionBuilder().setResultPartitionManager(network.getResultPartitionManager()).setupBufferPoolFactoryFromNettyShuffleEnvironment(network).build();
        localResultPartition.setup();
        remoteResultPartition.setup();
        SingleInputGate inputGate = this.createInputGate(network, 2, ResultPartitionType.PIPELINED);
        InputChannel[] inputChannels = new InputChannel[2];
        try (Closer closer = Closer.create();){
            closer.register(() -> ((NettyShuffleEnvironment)network).close());
            closer.register(() -> ((SingleInputGate)inputGate).close());
            ResultPartitionID localResultPartitionId = localResultPartition.getPartitionId();
            inputChannels[0] = this.buildUnknownInputChannel(network, inputGate, localResultPartitionId, 0);
            ResultPartitionID remoteResultPartitionId = remoteResultPartition.getPartitionId();
            inputChannels[1] = this.buildUnknownInputChannel(network, inputGate, remoteResultPartitionId, 1);
            inputGate.setInputChannels(inputChannels);
            inputGate.setup();
            Assertions.assertThat((Object)SingleInputGateTest.getTheOnlyInputChannelInPartition(inputGate, remoteResultPartitionId)).isInstanceOf(UnknownInputChannel.class);
            Assertions.assertThat((Object)SingleInputGateTest.getTheOnlyInputChannelInPartition(inputGate, localResultPartitionId)).isInstanceOf(UnknownInputChannel.class);
            ResourceID localLocation = ResourceID.generate();
            inputGate.updateInputChannel(localLocation, NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(remoteResultPartitionId.getPartitionId(), ResourceID.generate()));
            Assertions.assertThat((Object)SingleInputGateTest.getTheOnlyInputChannelInPartition(inputGate, remoteResultPartitionId)).isInstanceOf(RemoteInputChannel.class);
            Assertions.assertThat((Object)SingleInputGateTest.getTheOnlyInputChannelInPartition(inputGate, localResultPartitionId)).isInstanceOf(UnknownInputChannel.class);
            inputGate.updateInputChannel(localLocation, NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(localResultPartitionId.getPartitionId(), localLocation));
            Assertions.assertThat((Object)SingleInputGateTest.getTheOnlyInputChannelInPartition(inputGate, remoteResultPartitionId)).isInstanceOf(RemoteInputChannel.class);
            Assertions.assertThat((Object)SingleInputGateTest.getTheOnlyInputChannelInPartition(inputGate, localResultPartitionId)).isInstanceOf(LocalInputChannel.class);
        }
    }

    @Test
    void testSingleInputGateWithSubpartitionIndexRange() throws IOException, InterruptedException {
        IntermediateResultPartitionID[] partitionIds = new IntermediateResultPartitionID[]{new IntermediateResultPartitionID(), new IntermediateResultPartitionID(), new IntermediateResultPartitionID()};
        IndexRange subpartitionIndexRange = new IndexRange(0, 1);
        NettyShuffleEnvironment netEnv = new NettyShuffleEnvironmentBuilder().build();
        ResourceID localLocation = ResourceID.generate();
        SingleInputGate gate = SingleInputGateTest.createSingleInputGate(partitionIds, ResultPartitionType.BLOCKING, subpartitionIndexRange, netEnv, localLocation, new TestingConnectionManager(), new TestingResultPartitionManager((ResultSubpartitionView)new NoOpResultSubpartitionView()));
        for (InputChannel channel : gate.inputChannels()) {
            if (!(channel instanceof ChannelStateHolder)) continue;
            ((ChannelStateHolder)channel).setChannelStateWriter(ChannelStateWriter.NO_OP);
        }
        for (int i = 0; i < 3; ++i) {
            Assertions.assertThat(SingleInputGateTest.getInputChannelsInPartition(gate, partitionIds[i]).stream().map(InputChannel::getConsumedSubpartitionIndexSet).collect(Collectors.toList())).containsExactlyInAnyOrder((Object[])new ResultSubpartitionIndexSet[]{new ResultSubpartitionIndexSet(0), new ResultSubpartitionIndexSet(1)});
        }
        this.assertChannelsType(gate, LocalRecoveredInputChannel.class, partitionIds[0]);
        this.assertChannelsType(gate, RemoteRecoveredInputChannel.class, partitionIds[1]);
        this.assertChannelsType(gate, UnknownInputChannel.class, partitionIds[2]);
        gate.setup();
        Assertions.assertThat((Object)gate.getBufferPool()).isNotNull();
        Assertions.assertThat((int)gate.getBufferPool().getNumberOfRequiredMemorySegments()).isEqualTo(1);
        gate.finishReadRecoveredState();
        while (!gate.getStateConsumedFuture().isDone()) {
            gate.pollNext();
        }
        gate.requestPartitions();
        gate.pollNext();
        this.assertChannelsType(gate, LocalInputChannel.class, partitionIds[0]);
        this.assertChannelsType(gate, RemoteInputChannel.class, partitionIds[1]);
        this.assertChannelsType(gate, UnknownInputChannel.class, partitionIds[2]);
        for (InputChannel inputChannel : gate.inputChannels()) {
            if (inputChannel instanceof RemoteInputChannel) {
                Assertions.assertThat((Object)((RemoteInputChannel)inputChannel).getPartitionRequestClient()).isNotNull();
                Assertions.assertThat((int)((RemoteInputChannel)inputChannel).getInitialCredit()).isEqualTo(2);
                continue;
            }
            if (!(inputChannel instanceof LocalInputChannel)) continue;
            Assertions.assertThat((Object)((LocalInputChannel)inputChannel).getSubpartitionView()).isNotNull();
        }
        gate.updateInputChannel(localLocation, NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(partitionIds[2], localLocation));
        this.assertChannelsType(gate, LocalInputChannel.class, partitionIds[0]);
        this.assertChannelsType(gate, RemoteInputChannel.class, partitionIds[1]);
        this.assertChannelsType(gate, LocalInputChannel.class, partitionIds[2]);
    }

    private void assertChannelsType(SingleInputGate gate, Class<?> clazz, IntermediateResultPartitionID partitionID) {
        for (InputChannel inputChannel : SingleInputGateTest.getInputChannelsInPartition(gate, partitionID)) {
            Assertions.assertThat((Object)inputChannel).isInstanceOf(clazz);
        }
    }

    @Test
    void testQueuedBuffers() throws Exception {
        NettyShuffleEnvironment network = this.createNettyShuffleEnvironment();
        BufferWritingResultPartition resultPartition = (BufferWritingResultPartition)new ResultPartitionBuilder().setResultPartitionManager(network.getResultPartitionManager()).setupBufferPoolFactoryFromNettyShuffleEnvironment(network).build();
        SingleInputGate inputGate = this.createInputGate(network, 2, ResultPartitionType.PIPELINED);
        ResultPartitionID localResultPartitionId = resultPartition.getPartitionId();
        InputChannel[] inputChannels = new InputChannel[2];
        RemoteInputChannel remoteInputChannel = InputChannelBuilder.newBuilder().setChannelIndex(1).setupFromNettyShuffleEnvironment(network).setConnectionManager(new TestingConnectionManager()).buildRemoteChannel(inputGate);
        inputChannels[0] = remoteInputChannel;
        inputChannels[1] = InputChannelBuilder.newBuilder().setChannelIndex(0).setPartitionId(localResultPartitionId).setupFromNettyShuffleEnvironment(network).setConnectionManager(new TestingConnectionManager()).buildLocalChannel(inputGate);
        try (Closer closer = Closer.create();){
            closer.register(() -> ((NettyShuffleEnvironment)network).close());
            closer.register(() -> ((SingleInputGate)inputGate).close());
            closer.register(() -> ((BufferWritingResultPartition)resultPartition).release());
            resultPartition.setup();
            InputGateFairnessTest.setupInputGate(inputGate, inputChannels);
            remoteInputChannel.onBuffer(TestBufferFactory.createBuffer(1), 0, 0, 0);
            Assertions.assertThat((int)inputGate.getNumberOfQueuedBuffers()).isEqualTo(1);
            resultPartition.emitRecord(ByteBuffer.allocate(1), 0);
            Assertions.assertThat((int)inputGate.getNumberOfQueuedBuffers()).isEqualTo(2);
        }
    }

    @Test
    void testPartitionNotFoundExceptionWhileGetNextBuffer() throws Exception {
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        LocalInputChannel localChannel = InputChannelTestUtils.createLocalInputChannel(inputGate, new ResultPartitionManager());
        ResultPartitionID partitionId = localChannel.getPartitionId();
        inputGate.setInputChannels(new InputChannel[]{localChannel});
        localChannel.setError((Throwable)new PartitionNotFoundException(partitionId));
        Assertions.assertThatThrownBy(() -> ((SingleInputGate)inputGate).getNext()).isInstanceOfSatisfying(PartitionNotFoundException.class, notFoundException -> {
            ObjectAssert cfr_ignored_0 = (ObjectAssert)Assertions.assertThat((Object)notFoundException.getPartitionId()).isEqualTo((Object)partitionId);
        });
    }

    @Test
    void testAnnounceBufferSize() throws Exception {
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(2);
        LocalInputChannel localChannel = InputChannelTestUtils.createLocalInputChannel(inputGate, new TestingResultPartitionManager(InputChannelTestUtils.createResultSubpartitionView(new BufferConsumer[0])));
        RemoteInputChannel remoteInputChannel = InputChannelTestUtils.createRemoteInputChannel(inputGate, 1);
        inputGate.setInputChannels(new InputChannel[]{localChannel, remoteInputChannel});
        inputGate.requestPartitions();
        inputGate.announceBufferSize(10);
        localChannel.releaseAllResources();
        inputGate.announceBufferSize(11);
        remoteInputChannel.releaseAllResources();
        inputGate.announceBufferSize(12);
        inputGate.close();
        inputGate.announceBufferSize(13);
    }

    @Test
    void testInputGateRemovalFromNettyShuffleEnvironment() throws Exception {
        NettyShuffleEnvironment network = this.createNettyShuffleEnvironment();
        try (Closer closer = Closer.create();){
            closer.register(() -> ((NettyShuffleEnvironment)network).close());
            int numberOfGates = 10;
            Map<InputGateID, SingleInputGate> createdInputGatesById = SingleInputGateTest.createInputGateWithLocalChannels(network, numberOfGates, 1);
            Assertions.assertThat((int)createdInputGatesById.size()).isEqualTo(numberOfGates);
            for (InputGateID id : createdInputGatesById.keySet()) {
                Assertions.assertThat((boolean)network.getInputGate(id).isPresent()).isTrue();
                createdInputGatesById.get(id).close();
                Assertions.assertThat((boolean)network.getInputGate(id).isPresent()).isFalse();
            }
        }
    }

    @Test
    void testSingleInputGateInfo() {
        int numSingleInputGates = 2;
        int numInputChannels = 3;
        for (int i = 0; i < 2; ++i) {
            SingleInputGate gate = new SingleInputGateBuilder().setSingleInputGateIndex(i).setNumberOfChannels(3).build();
            int channelCounter = 0;
            for (InputChannel inputChannel : gate.inputChannels()) {
                InputChannelInfo channelInfo = inputChannel.getChannelInfo();
                Assertions.assertThat((int)channelInfo.getGateIdx()).isEqualTo(i);
                Assertions.assertThat((int)channelInfo.getInputChannelIdx()).isEqualTo(channelCounter++);
            }
        }
    }

    @Test
    void testGetUnfinishedChannels() throws IOException, InterruptedException {
        SingleInputGate inputGate = new SingleInputGateBuilder().setSingleInputGateIndex(1).setNumberOfChannels(3).build();
        InputChannel[] inputChannels = new TestInputChannel[]{new TestInputChannel(inputGate, 0), new TestInputChannel(inputGate, 1), new TestInputChannel(inputGate, 2)};
        inputGate.setInputChannels(inputChannels);
        Assertions.assertThat((List)inputGate.getUnfinishedChannels()).isEqualTo(Arrays.asList(inputChannels[0].getChannelInfo(), inputChannels[1].getChannelInfo(), inputChannels[2].getChannelInfo()));
        inputChannels[1].readEndOfPartitionEvent();
        inputGate.notifyChannelNonEmpty(inputChannels[1]);
        inputGate.getNext();
        Assertions.assertThat((List)inputGate.getUnfinishedChannels()).isEqualTo(Arrays.asList(inputChannels[0].getChannelInfo(), inputChannels[2].getChannelInfo()));
        inputChannels[0].readEndOfPartitionEvent();
        inputGate.notifyChannelNonEmpty(inputChannels[0]);
        inputGate.getNext();
        Assertions.assertThat((List)inputGate.getUnfinishedChannels()).isEqualTo(Collections.singletonList(inputChannels[2].getChannelInfo()));
        inputChannels[2].readEndOfPartitionEvent();
        inputGate.notifyChannelNonEmpty(inputChannels[2]);
        inputGate.getNext();
        Assertions.assertThat((List)inputGate.getUnfinishedChannels()).isEqualTo(Collections.emptyList());
    }

    @Test
    void testBufferInUseCount() throws Exception {
        SingleInputGate inputGate = this.createInputGate();
        InputChannel[] inputChannels = new TestInputChannel[]{new TestInputChannel(inputGate, 0), new TestInputChannel(inputGate, 1)};
        inputGate.setInputChannels(inputChannels);
        Assertions.assertThat((int)inputGate.getBuffersInUseCount()).isEqualTo(0);
        inputChannels[0].readBuffer();
        Assertions.assertThat((int)inputGate.getBuffersInUseCount()).isEqualTo(1);
        inputChannels[0].readBuffer();
        Assertions.assertThat((int)inputGate.getBuffersInUseCount()).isEqualTo(2);
        inputChannels[1].readBuffer();
        Assertions.assertThat((int)inputGate.getBuffersInUseCount()).isEqualTo(3);
    }

    @Test
    void testCalculateInputGateNetworkBuffers() throws Exception {
        SingleInputGateTest.verifyBuffersInBufferPool(true, 2);
        SingleInputGateTest.verifyBuffersInBufferPool(false, 2);
        SingleInputGateTest.verifyBuffersInBufferPool(true, 500);
        SingleInputGateTest.verifyBuffersInBufferPool(false, 500);
    }

    private static void verifyBuffersInBufferPool(boolean isPipeline, int subpartitionRandSize) throws Exception {
        int totalFloatingBuffersPerGate;
        int requiredFloatingBuffersPerGate;
        IntermediateResultPartitionID[] partitionIds = new IntermediateResultPartitionID[]{new IntermediateResultPartitionID(), new IntermediateResultPartitionID(), new IntermediateResultPartitionID()};
        IndexRange subpartitionIndexRange = new IndexRange(0, subpartitionRandSize - 1);
        NettyShuffleEnvironmentBuilder nettyShuffleEnvironmentBuilder = new NettyShuffleEnvironmentBuilder();
        Optional<Integer> expectMaxRequiredBuffersPerGate = isPipeline ? Optional.of(Integer.MAX_VALUE) : Optional.of(1000);
        nettyShuffleEnvironmentBuilder.setMaxRequiredBuffersPerGate(expectMaxRequiredBuffersPerGate);
        NettyShuffleEnvironment netEnv = nettyShuffleEnvironmentBuilder.build();
        SingleInputGate gate = SingleInputGateTest.createSingleInputGate(partitionIds, isPipeline ? ResultPartitionType.PIPELINED : ResultPartitionType.BLOCKING, subpartitionIndexRange, netEnv, ResourceID.generate(), new TestingConnectionManager(), new TestingResultPartitionManager((ResultSubpartitionView)new NoOpResultSubpartitionView()));
        gate.setup();
        for (InputChannel inputChannel : gate.inputChannels()) {
            if (!(inputChannel instanceof RemoteInputChannel)) continue;
            Assertions.assertThat((int)((RemoteInputChannel)inputChannel).getInitialCredit()).isEqualTo(0);
        }
        int targetTotalBuffersPerGate = 2 * partitionIds.length * subpartitionRandSize + 8;
        if (targetTotalBuffersPerGate >= expectMaxRequiredBuffersPerGate.get()) {
            requiredFloatingBuffersPerGate = expectMaxRequiredBuffersPerGate.get();
            totalFloatingBuffersPerGate = targetTotalBuffersPerGate;
        } else {
            requiredFloatingBuffersPerGate = 1;
            totalFloatingBuffersPerGate = 8;
        }
        Assertions.assertThat((int)gate.getBufferPool().getNumberOfRequiredMemorySegments()).isEqualTo(requiredFloatingBuffersPerGate);
        Assertions.assertThat((int)gate.getBufferPool().getMaxNumberOfMemorySegments()).isEqualTo(totalFloatingBuffersPerGate);
    }

    private static List<InputChannel> getInputChannelsInPartition(SingleInputGate inputGate, IntermediateResultPartitionID resultPartitionId) {
        return inputGate.getInputChannels().entrySet().stream().filter(x -> ((IntermediateResultPartitionID)((Tuple2)x.getKey()).f0).equals((Object)resultPartitionId)).map(Map.Entry::getValue).collect(Collectors.toList());
    }

    private static InputChannel getTheOnlyInputChannelInPartition(SingleInputGate inputGate, ResultPartitionID resultPartitionId) {
        return SingleInputGateTest.getTheOnlyInputChannelInPartition(inputGate, resultPartitionId.getPartitionId());
    }

    private static InputChannel getTheOnlyInputChannelInPartition(SingleInputGate inputGate, IntermediateResultPartitionID resultPartitionId) {
        List<InputChannel> inputChannels = SingleInputGateTest.getInputChannelsInPartition(inputGate, resultPartitionId);
        Assertions.assertThat(inputChannels).hasSize(1);
        return inputChannels.get(0);
    }

    static SingleInputGate createSingleInputGate(IntermediateResultPartitionID[] partitionIds, ResultPartitionType resultPartitionType, NettyShuffleEnvironment netEnv) throws IOException {
        return SingleInputGateTest.createSingleInputGate(partitionIds, resultPartitionType, new IndexRange(0, 0), netEnv, ResourceID.generate(), null, null);
    }

    static SingleInputGate createSingleInputGate(IntermediateResultPartitionID[] partitionIds, ResultPartitionType resultPartitionType, IndexRange subpartitionIndexRange, NettyShuffleEnvironment netEnv, ResourceID localLocation, ConnectionManager connectionManager, ResultPartitionManager resultPartitionManager) throws IOException {
        TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex[] channelDescs = new TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex[]{new TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex((ShuffleDescriptor)NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(partitionIds[0], localLocation), 0), new TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex((ShuffleDescriptor)NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(partitionIds[1], ResourceID.generate()), 1), new TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex((ShuffleDescriptor)new UnknownShuffleDescriptor(new ResultPartitionID(partitionIds[2], ExecutionGraphTestUtils.createExecutionAttemptId())), 2)};
        InputGateDeploymentDescriptor gateDesc = new InputGateDeploymentDescriptor(new IntermediateDataSetID(), resultPartitionType, subpartitionIndexRange, channelDescs.length, Collections.singletonList(new TaskDeploymentDescriptor.NonOffloaded((SerializedValue)CompressedSerializedValue.fromObject((Object)new TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup(channelDescs)))));
        TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
        return new SingleInputGateFactory(localLocation, netEnv.getConfiguration(), connectionManager != null ? connectionManager : netEnv.getConnectionManager(), resultPartitionManager != null ? resultPartitionManager : netEnv.getResultPartitionManager(), (TaskEventPublisher)new TaskEventDispatcher(), netEnv.getNetworkBufferPool(), null, null).create(netEnv.createShuffleIOOwnerContext("TestTask", taskMetricGroup.executionId(), (MetricGroup)taskMetricGroup), 0, gateDesc, SingleInputGateBuilder.NO_OP_PRODUCER_CHECKER, InputChannelTestUtils.newUnregisteredInputChannelMetrics());
    }

    private static Map<InputGateID, SingleInputGate> createInputGateWithLocalChannels(NettyShuffleEnvironment network, int numberOfGates, int numberOfLocalChannels) throws IOException {
        TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex[] channelDescs = new TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex[numberOfLocalChannels];
        for (int i = 0; i < numberOfLocalChannels; ++i) {
            channelDescs[i] = new TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex((ShuffleDescriptor)NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(new IntermediateResultPartitionID(), ResourceID.generate()), i);
        }
        InputGateDeploymentDescriptor[] gateDescs = new InputGateDeploymentDescriptor[numberOfGates];
        IntermediateDataSetID[] ids = new IntermediateDataSetID[numberOfGates];
        for (int i = 0; i < numberOfGates; ++i) {
            ids[i] = new IntermediateDataSetID();
            gateDescs[i] = new InputGateDeploymentDescriptor(ids[i], ResultPartitionType.PIPELINED, 0, channelDescs);
        }
        ExecutionAttemptID consumerID = ExecutionGraphTestUtils.createExecutionAttemptId();
        SingleInputGate[] gates = network.createInputGates(network.createShuffleIOOwnerContext("", consumerID, (MetricGroup)new UnregisteredMetricsGroup()), SingleInputGateBuilder.NO_OP_PRODUCER_CHECKER, Arrays.asList(gateDescs)).toArray(new SingleInputGate[0]);
        HashMap<InputGateID, SingleInputGate> inputGatesById = new HashMap<InputGateID, SingleInputGate>();
        for (int i = 0; i < numberOfGates; ++i) {
            inputGatesById.put(new InputGateID(ids[i], consumerID), gates[i]);
        }
        return inputGatesById;
    }

    private InputChannel buildUnknownInputChannel(NettyShuffleEnvironment network, SingleInputGate inputGate, ResultPartitionID partitionId, int channelIndex) {
        return InputChannelBuilder.newBuilder().setChannelIndex(channelIndex).setPartitionId(partitionId).setupFromNettyShuffleEnvironment(network).setConnectionManager(new TestingConnectionManager()).buildUnknownChannel(inputGate);
    }

    private NettyShuffleEnvironment createNettyShuffleEnvironment() {
        return new NettyShuffleEnvironmentBuilder().build();
    }

    static void verifyBufferOrEvent(InputGate inputGate, boolean expectedIsBuffer, int expectedChannelIndex, boolean expectedMoreAvailable) throws IOException, InterruptedException {
        Optional bufferOrEvent = inputGate.getNext();
        Assertions.assertThat((boolean)bufferOrEvent.isPresent()).isTrue();
        Assertions.assertThat((boolean)((BufferOrEvent)bufferOrEvent.get()).isBuffer()).isEqualTo(expectedIsBuffer);
        Assertions.assertThat((Object)((BufferOrEvent)bufferOrEvent.get()).getChannelInfo()).isEqualTo((Object)inputGate.getChannel(expectedChannelIndex).getChannelInfo());
        Assertions.assertThat((boolean)((BufferOrEvent)bufferOrEvent.get()).moreAvailable()).isEqualTo(expectedMoreAvailable);
        if (!expectedMoreAvailable) {
            Assertions.assertThat((boolean)inputGate.pollNext().isPresent()).isFalse();
        }
    }

    private SingleInputGate createInputGate(NettyShuffleEnvironment environment) {
        SingleInputGate inputGate = this.createInputGate(environment, 3, ResultPartitionType.PIPELINED);
        RemoteRecoveredInputChannel remoteChannel = new InputChannelBuilder().setChannelIndex(0).buildRemoteRecoveredChannel(inputGate);
        LocalRecoveredInputChannel localChannel = new InputChannelBuilder().setChannelIndex(1).buildLocalRecoveredChannel(inputGate);
        UnknownInputChannel unknownChannel = new InputChannelBuilder().setChannelIndex(2).buildUnknownChannel(inputGate);
        inputGate.setInputChannels(new InputChannel[]{remoteChannel, localChannel, unknownChannel});
        return inputGate;
    }

    private static class TestingTaskEventPublisher
    implements TaskEventPublisher {
        private int counter = 0;

        private TestingTaskEventPublisher() {
        }

        public boolean publish(ResultPartitionID partitionId, TaskEvent event) {
            ++this.counter;
            return true;
        }
    }

    public static class TestingResultPartitionManager
    extends ResultPartitionManager {
        private int counter = 0;
        private final ResultSubpartitionView subpartitionView;

        public TestingResultPartitionManager(ResultSubpartitionView subpartitionView) {
            this.subpartitionView = subpartitionView;
        }

        public ResultSubpartitionView createSubpartitionView(ResultPartitionID partitionId, ResultSubpartitionIndexSet subpartitionIndexSet, BufferAvailabilityListener availabilityListener) throws IOException {
            ++this.counter;
            return this.subpartitionView;
        }
    }
}

