/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.TestingConnectionManager;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.InputGateFairnessTest;
import org.apache.flink.runtime.io.network.partition.NoOpResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputGateID;
import org.apache.flink.runtime.io.network.partition.consumer.InputGateTestBase;
import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.LocalRecoveredInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteRecoveredInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory;
import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.UnknownInputChannel;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.io.network.util.TestTaskEvent;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
import org.apache.flink.util.Preconditions;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class SingleInputGateTest
extends InputGateTestBase {
    @Test(expected=CheckpointException.class)
    public void testCheckpointsDeclinedUnlessAllChannelsAreKnown() throws CheckpointException {
        SingleInputGate gate = this.createInputGate(this.createNettyShuffleEnvironment(), 1, ResultPartitionType.PIPELINED);
        gate.setInputChannels(new InputChannel[]{new InputChannelBuilder().setChannelIndex(0).buildUnknownChannel(gate)});
        gate.checkpointStarted(new CheckpointBarrier(1L, 1L, CheckpointOptions.alignedNoTimeout((CheckpointType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault())));
    }

    @Test(expected=CheckpointException.class)
    public void testCheckpointsDeclinedUnlessStateConsumed() throws CheckpointException {
        SingleInputGate gate = this.createInputGate(this.createNettyShuffleEnvironment());
        Preconditions.checkState((!gate.getStateConsumedFuture().isDone() ? 1 : 0) != 0);
        gate.checkpointStarted(new CheckpointBarrier(1L, 1L, CheckpointOptions.alignedNoTimeout((CheckpointType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault())));
    }

    @Test
    public void testSetupLogic() throws Exception {
        NettyShuffleEnvironment environment = this.createNettyShuffleEnvironment();
        SingleInputGate inputGate = this.createInputGate(environment);
        try (Closer closer = Closer.create();){
            closer.register(() -> ((NettyShuffleEnvironment)environment).close());
            closer.register(() -> ((SingleInputGate)inputGate).close());
            Assert.assertNull((Object)inputGate.getBufferPool());
            for (InputChannel inputChannel : inputGate.getInputChannels().values()) {
                Assert.assertTrue((inputChannel instanceof RecoveredInputChannel || inputChannel instanceof UnknownInputChannel ? 1 : 0) != 0);
                if (!(inputChannel instanceof RecoveredInputChannel)) continue;
                Assert.assertEquals((long)0L, (long)((RecoveredInputChannel)inputChannel).bufferManager.getNumberOfAvailableBuffers());
            }
            inputGate.setup();
            Assert.assertNotNull((Object)inputGate.getBufferPool());
            Assert.assertEquals((long)1L, (long)inputGate.getBufferPool().getNumberOfRequiredMemorySegments());
            for (InputChannel inputChannel : inputGate.getInputChannels().values()) {
                if (inputChannel instanceof RemoteRecoveredInputChannel) {
                    Assert.assertEquals((long)0L, (long)((RemoteRecoveredInputChannel)inputChannel).bufferManager.getNumberOfAvailableBuffers());
                    continue;
                }
                if (!(inputChannel instanceof LocalRecoveredInputChannel)) continue;
                Assert.assertEquals((long)0L, (long)((LocalRecoveredInputChannel)inputChannel).bufferManager.getNumberOfAvailableBuffers());
            }
            inputGate.convertRecoveredInputChannels();
            Assert.assertNotNull((Object)inputGate.getBufferPool());
            Assert.assertEquals((long)1L, (long)inputGate.getBufferPool().getNumberOfRequiredMemorySegments());
            for (InputChannel inputChannel : inputGate.getInputChannels().values()) {
                if (!(inputChannel instanceof RemoteInputChannel)) continue;
                Assert.assertEquals((long)2L, (long)((RemoteInputChannel)inputChannel).getNumberOfAvailableBuffers());
            }
        }
    }

    @Test
    public void testPartitionRequestLogic() throws Exception {
        NettyShuffleEnvironment environment = new NettyShuffleEnvironmentBuilder().build();
        SingleInputGate gate = this.createInputGate(environment);
        try (Closer closer = Closer.create();){
            closer.register(() -> ((NettyShuffleEnvironment)environment).close());
            closer.register(() -> ((SingleInputGate)gate).close());
            gate.finishReadRecoveredState();
            while (!gate.getStateConsumedFuture().isDone()) {
                gate.pollNext();
            }
            gate.requestPartitions();
            gate.pollNext();
            InputChannel remoteChannel = gate.getChannel(0);
            MatcherAssert.assertThat((Object)remoteChannel, (Matcher)Matchers.instanceOf(RemoteInputChannel.class));
            Assert.assertNotNull((Object)((RemoteInputChannel)remoteChannel).getPartitionRequestClient());
            Assert.assertEquals((long)2L, (long)((RemoteInputChannel)remoteChannel).getInitialCredit());
            InputChannel localChannel = gate.getChannel(1);
            MatcherAssert.assertThat((Object)localChannel, (Matcher)Matchers.instanceOf(LocalInputChannel.class));
            Assert.assertNotNull((Object)((LocalInputChannel)localChannel).getSubpartitionView());
            MatcherAssert.assertThat((Object)gate.getChannel(2), (Matcher)Matchers.instanceOf(UnknownInputChannel.class));
        }
    }

    @Test
    public void testBasicGetNextLogic() throws Exception {
        SingleInputGate inputGate = this.createInputGate();
        InputChannel[] inputChannels = new TestInputChannel[]{new TestInputChannel(inputGate, 0), new TestInputChannel(inputGate, 1)};
        inputGate.setInputChannels(inputChannels);
        inputChannels[0].readBuffer();
        inputChannels[0].readBuffer();
        inputChannels[1].readBuffer();
        inputChannels[1].readEndOfData();
        inputChannels[0].readEndOfData();
        inputChannels[1].readEndOfPartitionEvent();
        inputChannels[0].readEndOfPartitionEvent();
        inputGate.notifyChannelNonEmpty(inputChannels[0]);
        inputGate.notifyChannelNonEmpty(inputChannels[1]);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, true, 0, true);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, true, 1, true);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, true, 0, true);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, false, 1, true);
        Assert.assertFalse((boolean)inputGate.hasReceivedEndOfData());
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, false, 0, true);
        Assert.assertFalse((boolean)inputGate.isFinished());
        Assert.assertTrue((boolean)inputGate.hasReceivedEndOfData());
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, false, 1, true);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, false, 0, false);
        Assert.assertTrue((boolean)inputGate.hasReceivedEndOfData());
        Assert.assertTrue((boolean)inputGate.isFinished());
        for (InputChannel ic : inputChannels) {
            ic.assertReturnedEventsAreRecycled();
        }
    }

    @Test
    public void testGetCompressedBuffer() throws Exception {
        int bufferSize = 1024;
        String compressionCodec = "LZ4";
        BufferCompressor compressor = new BufferCompressor(bufferSize, compressionCodec);
        BufferDecompressor decompressor = new BufferDecompressor(bufferSize, compressionCodec);
        try (SingleInputGate inputGate = new SingleInputGateBuilder().setBufferDecompressor(decompressor).build();){
            TestInputChannel inputChannel = new TestInputChannel(inputGate, 0);
            MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment((int)bufferSize);
            for (int i = 0; i < bufferSize; i += 8) {
                segment.putLongLittleEndian(i, (long)i);
            }
            NetworkBuffer uncompressedBuffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE);
            uncompressedBuffer.setSize(bufferSize);
            Buffer compressedBuffer = compressor.compressToOriginalBuffer((Buffer)uncompressedBuffer);
            Assert.assertTrue((boolean)compressedBuffer.isCompressed());
            inputChannel.read(compressedBuffer);
            inputGate.setInputChannels(new InputChannel[]{inputChannel});
            inputGate.notifyChannelNonEmpty((InputChannel)inputChannel);
            Optional bufferOrEvent = inputGate.getNext();
            Assert.assertTrue((boolean)bufferOrEvent.isPresent());
            Assert.assertTrue((boolean)((BufferOrEvent)bufferOrEvent.get()).isBuffer());
            ByteBuffer buffer = ((BufferOrEvent)bufferOrEvent.get()).getBuffer().getNioBufferReadable().order(ByteOrder.LITTLE_ENDIAN);
            for (int i = 0; i < bufferSize; i += 8) {
                Assert.assertEquals((long)i, (long)buffer.getLong());
            }
        }
    }

    @Test
    public void testNotifyAfterEndOfPartition() throws Exception {
        SingleInputGate inputGate = this.createInputGate(2);
        TestInputChannel inputChannel = new TestInputChannel(inputGate, 0);
        inputGate.setInputChannels(new InputChannel[]{inputChannel, new TestInputChannel(inputGate, 1)});
        inputChannel.readEndOfPartitionEvent();
        inputChannel.notifyChannelNonEmpty();
        Assert.assertEquals((Object)EndOfPartitionEvent.INSTANCE, (Object)((BufferOrEvent)inputGate.pollNext().get()).getEvent());
        inputChannel.notifyChannelNonEmpty();
        Assert.assertFalse((boolean)inputGate.pollNext().isPresent());
    }

    @Test
    public void testIsAvailable() throws Exception {
        SingleInputGate inputGate = this.createInputGate(1);
        TestInputChannel inputChannel = new TestInputChannel(inputGate, 0);
        inputGate.setInputChannels(new InputChannel[]{inputChannel});
        this.testIsAvailable((InputGate)inputGate, inputGate, inputChannel);
    }

    @Test
    public void testIsAvailableAfterFinished() throws Exception {
        SingleInputGate inputGate = this.createInputGate(1);
        TestInputChannel inputChannel = new TestInputChannel(inputGate, 0);
        inputGate.setInputChannels(new InputChannel[]{inputChannel});
        this.testIsAvailableAfterFinished((InputGate)inputGate, () -> {
            inputChannel.readEndOfPartitionEvent();
            inputGate.notifyChannelNonEmpty((InputChannel)inputChannel);
        });
    }

    @Test
    public void testIsMoreAvailableReadingFromSingleInputChannel() throws Exception {
        SingleInputGate inputGate = this.createInputGate();
        InputChannel[] inputChannels = new TestInputChannel[]{new TestInputChannel(inputGate, 0), new TestInputChannel(inputGate, 1)};
        inputGate.setInputChannels(inputChannels);
        inputChannels[0].readBuffer();
        inputChannels[0].readEndOfPartitionEvent();
        inputGate.notifyChannelNonEmpty(inputChannels[0]);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, true, 0, true);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, false, 0, false);
    }

    @Test
    public void testBackwardsEventWithUninitializedChannel() throws Exception {
        TestingTaskEventPublisher taskEventPublisher = new TestingTaskEventPublisher();
        TestingResultPartitionManager partitionManager = new TestingResultPartitionManager((ResultSubpartitionView)new NoOpResultSubpartitionView());
        NettyShuffleEnvironment environment = this.createNettyShuffleEnvironment();
        SingleInputGate inputGate = this.createInputGate(environment, 2, ResultPartitionType.PIPELINED);
        InputChannel[] inputChannels = new InputChannel[2];
        try (Closer closer = Closer.create();){
            closer.register(() -> ((NettyShuffleEnvironment)environment).close());
            closer.register(() -> ((SingleInputGate)inputGate).close());
            ResultPartitionID localPartitionId = new ResultPartitionID();
            inputChannels[0] = InputChannelBuilder.newBuilder().setPartitionId(localPartitionId).setPartitionManager(partitionManager).setTaskEventPublisher(taskEventPublisher).buildLocalChannel(inputGate);
            ResultPartitionID unknownPartitionId = new ResultPartitionID();
            inputChannels[1] = InputChannelBuilder.newBuilder().setChannelIndex(1).setPartitionId(unknownPartitionId).setPartitionManager(partitionManager).setTaskEventPublisher(taskEventPublisher).buildUnknownChannel(inputGate);
            InputGateFairnessTest.setupInputGate(inputGate, inputChannels);
            Assert.assertEquals((long)1L, (long)partitionManager.counter);
            TestTaskEvent event = new TestTaskEvent();
            inputGate.sendTaskEvent((TaskEvent)event);
            Assert.assertEquals((long)1L, (long)taskEventPublisher.counter);
            ResourceID location = ResourceID.generate();
            inputGate.updateInputChannel(location, NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(unknownPartitionId.getPartitionId(), location));
            Assert.assertEquals((long)2L, (long)partitionManager.counter);
            Assert.assertEquals((long)2L, (long)taskEventPublisher.counter);
        }
    }

    @Test
    public void testUpdateChannelBeforeRequest() throws Exception {
        SingleInputGate inputGate = this.createInputGate(1);
        TestingResultPartitionManager partitionManager = new TestingResultPartitionManager((ResultSubpartitionView)new NoOpResultSubpartitionView());
        UnknownInputChannel unknown = InputChannelBuilder.newBuilder().setPartitionManager(partitionManager).buildUnknownChannel(inputGate);
        inputGate.setInputChannels(new InputChannel[]{unknown});
        ResultPartitionID resultPartitionID = unknown.getPartitionId();
        ResourceID location = ResourceID.generate();
        inputGate.updateInputChannel(location, NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(resultPartitionID.getPartitionId(), location));
        Assert.assertEquals((long)0L, (long)partitionManager.counter);
    }

    @Test
    public void testReleaseWhilePollingChannel() throws Exception {
        final AtomicReference asyncException = new AtomicReference();
        final SingleInputGate inputGate = this.createInputGate(1);
        UnknownInputChannel inputChannel = InputChannelBuilder.newBuilder().buildUnknownChannel(inputGate);
        inputGate.setInputChannels(new InputChannel[]{inputChannel});
        Thread asyncConsumer = new Thread(){

            @Override
            public void run() {
                try {
                    inputGate.getNext();
                }
                catch (Exception e) {
                    asyncException.set(e);
                }
            }
        };
        asyncConsumer.start();
        boolean success = false;
        for (int i = 0; i < 50; ++i) {
            if (asyncConsumer.isAlive()) {
                boolean bl = success = asyncConsumer.getState() == Thread.State.WAITING;
            }
            if (success) break;
            Thread.sleep(100L);
        }
        Assert.assertTrue((String)"Did not trigger blocking buffer request.", (boolean)success);
        inputGate.close();
        asyncConsumer.join();
        Assert.assertNotNull(asyncException.get());
        Assert.assertEquals(IllegalStateException.class, ((Exception)asyncException.get()).getClass());
    }

    @Test
    public void testRequestBackoffConfiguration() throws Exception {
        IntermediateResultPartitionID[] partitionIds = new IntermediateResultPartitionID[]{new IntermediateResultPartitionID(), new IntermediateResultPartitionID(), new IntermediateResultPartitionID()};
        ResourceID localLocation = ResourceID.generate();
        ShuffleDescriptor[] channelDescs = new ShuffleDescriptor[]{NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(partitionIds[0], localLocation), NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(partitionIds[1], ResourceID.generate()), new UnknownShuffleDescriptor(new ResultPartitionID(partitionIds[2], new ExecutionAttemptID()))};
        InputGateDeploymentDescriptor gateDesc = new InputGateDeploymentDescriptor(new IntermediateDataSetID(), ResultPartitionType.PIPELINED, 0, channelDescs);
        int initialBackoff = 137;
        int maxBackoff = 1001;
        NettyShuffleEnvironment netEnv = new NettyShuffleEnvironmentBuilder().setPartitionRequestInitialBackoff(initialBackoff).setPartitionRequestMaxBackoff(maxBackoff).build();
        SingleInputGate gate = new SingleInputGateFactory(localLocation, netEnv.getConfiguration(), netEnv.getConnectionManager(), netEnv.getResultPartitionManager(), (TaskEventPublisher)new TaskEventDispatcher(), netEnv.getNetworkBufferPool()).create("TestTask", 0, gateDesc, SingleInputGateBuilder.NO_OP_PRODUCER_CHECKER, InputChannelTestUtils.newUnregisteredInputChannelMetrics());
        gate.setChannelStateWriter(ChannelStateWriter.NO_OP);
        gate.finishReadRecoveredState();
        while (!gate.getStateConsumedFuture().isDone()) {
            gate.pollNext();
        }
        gate.convertRecoveredInputChannels();
        try (Closer closer = Closer.create();){
            InputChannel[] channels;
            closer.register(() -> ((NettyShuffleEnvironment)netEnv).close());
            closer.register(() -> ((SingleInputGate)gate).close());
            Assert.assertEquals((Object)gateDesc.getConsumedPartitionType(), (Object)gate.getConsumedPartitionType());
            Map channelMap = gate.getInputChannels();
            Assert.assertEquals((long)3L, (long)channelMap.size());
            channelMap.values().forEach(channel -> {
                try {
                    channel.checkError();
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
            InputChannel localChannel = (InputChannel)channelMap.get(partitionIds[0]);
            Assert.assertEquals(LocalInputChannel.class, localChannel.getClass());
            InputChannel remoteChannel = (InputChannel)channelMap.get(partitionIds[1]);
            Assert.assertEquals(RemoteInputChannel.class, remoteChannel.getClass());
            InputChannel unknownChannel = (InputChannel)channelMap.get(partitionIds[2]);
            Assert.assertEquals(UnknownInputChannel.class, unknownChannel.getClass());
            for (InputChannel ch : channels = new InputChannel[]{localChannel, remoteChannel, unknownChannel}) {
                Assert.assertEquals((long)0L, (long)ch.getCurrentBackoff());
                Assert.assertTrue((boolean)ch.increaseBackoff());
                Assert.assertEquals((long)initialBackoff, (long)ch.getCurrentBackoff());
                Assert.assertTrue((boolean)ch.increaseBackoff());
                Assert.assertEquals((long)(initialBackoff * 2), (long)ch.getCurrentBackoff());
                Assert.assertTrue((boolean)ch.increaseBackoff());
                Assert.assertEquals((long)(initialBackoff * 2 * 2), (long)ch.getCurrentBackoff());
                Assert.assertTrue((boolean)ch.increaseBackoff());
                Assert.assertEquals((long)maxBackoff, (long)ch.getCurrentBackoff());
                Assert.assertFalse((boolean)ch.increaseBackoff());
            }
        }
    }

    @Test
    public void testRequestBuffersWithRemoteInputChannel() throws Exception {
        NettyShuffleEnvironment network = this.createNettyShuffleEnvironment();
        SingleInputGate inputGate = this.createInputGate(network, 1, ResultPartitionType.PIPELINED_BOUNDED);
        int buffersPerChannel = 2;
        int extraNetworkBuffersPerGate = 8;
        try (Closer closer = Closer.create();){
            closer.register(() -> ((NettyShuffleEnvironment)network).close());
            closer.register(() -> ((SingleInputGate)inputGate).close());
            RemoteInputChannel remote = InputChannelBuilder.newBuilder().setupFromNettyShuffleEnvironment(network).setConnectionManager(new TestingConnectionManager()).buildRemoteChannel(inputGate);
            inputGate.setInputChannels(new InputChannel[]{remote});
            inputGate.setup();
            NetworkBufferPool bufferPool = network.getNetworkBufferPool();
            Assert.assertEquals((long)buffersPerChannel, (long)remote.getNumberOfAvailableBuffers());
            Assert.assertEquals((long)(bufferPool.getTotalNumberOfMemorySegments() - buffersPerChannel - 1), (long)bufferPool.getNumberOfAvailableMemorySegments());
            Assert.assertEquals((long)extraNetworkBuffersPerGate, (long)bufferPool.countBuffers());
        }
    }

    @Test
    public void testRequestBuffersWithUnknownInputChannel() throws Exception {
        NettyShuffleEnvironment network = this.createNettyShuffleEnvironment();
        SingleInputGate inputGate = this.createInputGate(network, 1, ResultPartitionType.PIPELINED_BOUNDED);
        int buffersPerChannel = 2;
        int extraNetworkBuffersPerGate = 8;
        try (Closer closer = Closer.create();){
            closer.register(() -> ((NettyShuffleEnvironment)network).close());
            closer.register(() -> ((SingleInputGate)inputGate).close());
            ResultPartitionID resultPartitionId = new ResultPartitionID();
            InputChannel inputChannel = this.buildUnknownInputChannel(network, inputGate, resultPartitionId, 0);
            inputGate.setInputChannels(new InputChannel[]{inputChannel});
            inputGate.setup();
            NetworkBufferPool bufferPool = network.getNetworkBufferPool();
            Assert.assertEquals((long)(bufferPool.getTotalNumberOfMemorySegments() - 1), (long)bufferPool.getNumberOfAvailableMemorySegments());
            Assert.assertEquals((long)extraNetworkBuffersPerGate, (long)bufferPool.countBuffers());
            inputGate.updateInputChannel(ResourceID.generate(), NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(resultPartitionId.getPartitionId(), ResourceID.generate()));
            RemoteInputChannel remote = (RemoteInputChannel)inputGate.getInputChannels().get(resultPartitionId.getPartitionId());
            Assert.assertEquals((long)buffersPerChannel, (long)remote.getNumberOfAvailableBuffers());
            Assert.assertEquals((long)(bufferPool.getTotalNumberOfMemorySegments() - buffersPerChannel - 1), (long)bufferPool.getNumberOfAvailableMemorySegments());
            Assert.assertEquals((long)extraNetworkBuffersPerGate, (long)bufferPool.countBuffers());
        }
    }

    @Test
    public void testUpdateUnknownInputChannel() throws Exception {
        NettyShuffleEnvironment network = this.createNettyShuffleEnvironment();
        ResultPartition localResultPartition = new ResultPartitionBuilder().setResultPartitionManager(network.getResultPartitionManager()).setupBufferPoolFactoryFromNettyShuffleEnvironment(network).build();
        ResultPartition remoteResultPartition = new ResultPartitionBuilder().setResultPartitionManager(network.getResultPartitionManager()).setupBufferPoolFactoryFromNettyShuffleEnvironment(network).build();
        localResultPartition.setup();
        remoteResultPartition.setup();
        SingleInputGate inputGate = this.createInputGate(network, 2, ResultPartitionType.PIPELINED);
        InputChannel[] inputChannels = new InputChannel[2];
        try (Closer closer = Closer.create();){
            closer.register(() -> ((NettyShuffleEnvironment)network).close());
            closer.register(() -> ((SingleInputGate)inputGate).close());
            ResultPartitionID localResultPartitionId = localResultPartition.getPartitionId();
            inputChannels[0] = this.buildUnknownInputChannel(network, inputGate, localResultPartitionId, 0);
            ResultPartitionID remoteResultPartitionId = remoteResultPartition.getPartitionId();
            inputChannels[1] = this.buildUnknownInputChannel(network, inputGate, remoteResultPartitionId, 1);
            inputGate.setInputChannels(inputChannels);
            inputGate.setup();
            MatcherAssert.assertThat(inputGate.getInputChannels().get(remoteResultPartitionId.getPartitionId()), (Matcher)Matchers.is((Matcher)Matchers.instanceOf(UnknownInputChannel.class)));
            MatcherAssert.assertThat(inputGate.getInputChannels().get(localResultPartitionId.getPartitionId()), (Matcher)Matchers.is((Matcher)Matchers.instanceOf(UnknownInputChannel.class)));
            ResourceID localLocation = ResourceID.generate();
            inputGate.updateInputChannel(localLocation, NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(remoteResultPartitionId.getPartitionId(), ResourceID.generate()));
            MatcherAssert.assertThat(inputGate.getInputChannels().get(remoteResultPartitionId.getPartitionId()), (Matcher)Matchers.is((Matcher)Matchers.instanceOf(RemoteInputChannel.class)));
            MatcherAssert.assertThat(inputGate.getInputChannels().get(localResultPartitionId.getPartitionId()), (Matcher)Matchers.is((Matcher)Matchers.instanceOf(UnknownInputChannel.class)));
            inputGate.updateInputChannel(localLocation, NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(localResultPartitionId.getPartitionId(), localLocation));
            MatcherAssert.assertThat(inputGate.getInputChannels().get(remoteResultPartitionId.getPartitionId()), (Matcher)Matchers.is((Matcher)Matchers.instanceOf(RemoteInputChannel.class)));
            MatcherAssert.assertThat(inputGate.getInputChannels().get(localResultPartitionId.getPartitionId()), (Matcher)Matchers.is((Matcher)Matchers.instanceOf(LocalInputChannel.class)));
        }
    }

    @Test
    public void testQueuedBuffers() throws Exception {
        NettyShuffleEnvironment network = this.createNettyShuffleEnvironment();
        BufferWritingResultPartition resultPartition = (BufferWritingResultPartition)new ResultPartitionBuilder().setResultPartitionManager(network.getResultPartitionManager()).setupBufferPoolFactoryFromNettyShuffleEnvironment(network).build();
        SingleInputGate inputGate = this.createInputGate(network, 2, ResultPartitionType.PIPELINED);
        ResultPartitionID localResultPartitionId = resultPartition.getPartitionId();
        InputChannel[] inputChannels = new InputChannel[2];
        RemoteInputChannel remoteInputChannel = InputChannelBuilder.newBuilder().setChannelIndex(1).setupFromNettyShuffleEnvironment(network).setConnectionManager(new TestingConnectionManager()).buildRemoteChannel(inputGate);
        inputChannels[0] = remoteInputChannel;
        inputChannels[1] = InputChannelBuilder.newBuilder().setChannelIndex(0).setPartitionId(localResultPartitionId).setupFromNettyShuffleEnvironment(network).setConnectionManager(new TestingConnectionManager()).buildLocalChannel(inputGate);
        try (Closer closer = Closer.create();){
            closer.register(() -> ((NettyShuffleEnvironment)network).close());
            closer.register(() -> ((SingleInputGate)inputGate).close());
            closer.register(() -> ((BufferWritingResultPartition)resultPartition).release());
            resultPartition.setup();
            InputGateFairnessTest.setupInputGate(inputGate, inputChannels);
            remoteInputChannel.onBuffer(TestBufferFactory.createBuffer(1), 0, 0);
            Assert.assertEquals((long)1L, (long)inputGate.getNumberOfQueuedBuffers());
            resultPartition.emitRecord(ByteBuffer.allocate(1), 0);
            Assert.assertEquals((long)2L, (long)inputGate.getNumberOfQueuedBuffers());
        }
    }

    @Test
    public void testPartitionNotFoundExceptionWhileGetNextBuffer() throws Exception {
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        LocalInputChannel localChannel = InputChannelTestUtils.createLocalInputChannel(inputGate, new ResultPartitionManager());
        ResultPartitionID partitionId = localChannel.getPartitionId();
        inputGate.setInputChannels(new InputChannel[]{localChannel});
        localChannel.setError((Throwable)new PartitionNotFoundException(partitionId));
        try {
            inputGate.getNext();
            Assert.fail((String)"Should throw a PartitionNotFoundException.");
        }
        catch (PartitionNotFoundException notFound) {
            MatcherAssert.assertThat((Object)partitionId, (Matcher)Matchers.is((Object)notFound.getPartitionId()));
        }
    }

    @Test
    public void testInputGateRemovalFromNettyShuffleEnvironment() throws Exception {
        NettyShuffleEnvironment network = this.createNettyShuffleEnvironment();
        try (Closer closer = Closer.create();){
            closer.register(() -> ((NettyShuffleEnvironment)network).close());
            int numberOfGates = 10;
            Map<InputGateID, SingleInputGate> createdInputGatesById = SingleInputGateTest.createInputGateWithLocalChannels(network, numberOfGates, 1);
            Assert.assertEquals((long)numberOfGates, (long)createdInputGatesById.size());
            for (InputGateID id : createdInputGatesById.keySet()) {
                MatcherAssert.assertThat((Object)network.getInputGate(id).isPresent(), (Matcher)Matchers.is((Object)true));
                createdInputGatesById.get(id).close();
                MatcherAssert.assertThat((Object)network.getInputGate(id).isPresent(), (Matcher)Matchers.is((Object)false));
            }
        }
    }

    @Test
    public void testSingleInputGateInfo() {
        int numSingleInputGates = 2;
        int numInputChannels = 3;
        for (int i = 0; i < 2; ++i) {
            SingleInputGate gate = new SingleInputGateBuilder().setSingleInputGateIndex(i).setNumberOfChannels(3).build();
            int channelCounter = 0;
            for (InputChannel inputChannel : gate.getInputChannels().values()) {
                InputChannelInfo channelInfo = inputChannel.getChannelInfo();
                Assert.assertEquals((long)i, (long)channelInfo.getGateIdx());
                Assert.assertEquals((long)channelCounter++, (long)channelInfo.getInputChannelIdx());
            }
        }
    }

    @Test
    public void testGetUnfinishedChannels() throws IOException, InterruptedException {
        SingleInputGate inputGate = new SingleInputGateBuilder().setSingleInputGateIndex(1).setNumberOfChannels(3).build();
        InputChannel[] inputChannels = new TestInputChannel[]{new TestInputChannel(inputGate, 0), new TestInputChannel(inputGate, 1), new TestInputChannel(inputGate, 2)};
        inputGate.setInputChannels(inputChannels);
        Assert.assertEquals(Arrays.asList(inputChannels[0].getChannelInfo(), inputChannels[1].getChannelInfo(), inputChannels[2].getChannelInfo()), (Object)inputGate.getUnfinishedChannels());
        inputChannels[1].readEndOfPartitionEvent();
        inputGate.notifyChannelNonEmpty(inputChannels[1]);
        inputGate.getNext();
        Assert.assertEquals(Arrays.asList(inputChannels[0].getChannelInfo(), inputChannels[2].getChannelInfo()), (Object)inputGate.getUnfinishedChannels());
        inputChannels[0].readEndOfPartitionEvent();
        inputGate.notifyChannelNonEmpty(inputChannels[0]);
        inputGate.getNext();
        Assert.assertEquals(Collections.singletonList(inputChannels[2].getChannelInfo()), (Object)inputGate.getUnfinishedChannels());
        inputChannels[2].readEndOfPartitionEvent();
        inputGate.notifyChannelNonEmpty(inputChannels[2]);
        inputGate.getNext();
        Assert.assertEquals(Collections.emptyList(), (Object)inputGate.getUnfinishedChannels());
    }

    private static Map<InputGateID, SingleInputGate> createInputGateWithLocalChannels(NettyShuffleEnvironment network, int numberOfGates, int numberOfLocalChannels) throws IOException {
        NettyShuffleDescriptor[] channelDescs = new NettyShuffleDescriptor[numberOfLocalChannels];
        for (int i = 0; i < numberOfLocalChannels; ++i) {
            channelDescs[i] = NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(new IntermediateResultPartitionID(), ResourceID.generate());
        }
        InputGateDeploymentDescriptor[] gateDescs = new InputGateDeploymentDescriptor[numberOfGates];
        IntermediateDataSetID[] ids = new IntermediateDataSetID[numberOfGates];
        for (int i = 0; i < numberOfGates; ++i) {
            ids[i] = new IntermediateDataSetID();
            gateDescs[i] = new InputGateDeploymentDescriptor(ids[i], ResultPartitionType.PIPELINED, 0, (ShuffleDescriptor[])channelDescs);
        }
        ExecutionAttemptID consumerID = new ExecutionAttemptID();
        SingleInputGate[] gates = network.createInputGates(network.createShuffleIOOwnerContext("", consumerID, (MetricGroup)new UnregisteredMetricsGroup()), SingleInputGateBuilder.NO_OP_PRODUCER_CHECKER, Arrays.asList(gateDescs)).toArray(new SingleInputGate[0]);
        HashMap<InputGateID, SingleInputGate> inputGatesById = new HashMap<InputGateID, SingleInputGate>();
        for (int i = 0; i < numberOfGates; ++i) {
            inputGatesById.put(new InputGateID(ids[i], consumerID), gates[i]);
        }
        return inputGatesById;
    }

    private InputChannel buildUnknownInputChannel(NettyShuffleEnvironment network, SingleInputGate inputGate, ResultPartitionID partitionId, int channelIndex) {
        return InputChannelBuilder.newBuilder().setChannelIndex(channelIndex).setPartitionId(partitionId).setupFromNettyShuffleEnvironment(network).setConnectionManager(new TestingConnectionManager()).buildUnknownChannel(inputGate);
    }

    private NettyShuffleEnvironment createNettyShuffleEnvironment() {
        return new NettyShuffleEnvironmentBuilder().build();
    }

    static void verifyBufferOrEvent(InputGate inputGate, boolean expectedIsBuffer, int expectedChannelIndex, boolean expectedMoreAvailable) throws IOException, InterruptedException {
        Optional bufferOrEvent = inputGate.getNext();
        Assert.assertTrue((boolean)bufferOrEvent.isPresent());
        Assert.assertEquals((Object)expectedIsBuffer, (Object)((BufferOrEvent)bufferOrEvent.get()).isBuffer());
        Assert.assertEquals((Object)inputGate.getChannel(expectedChannelIndex).getChannelInfo(), (Object)((BufferOrEvent)bufferOrEvent.get()).getChannelInfo());
        Assert.assertEquals((Object)expectedMoreAvailable, (Object)((BufferOrEvent)bufferOrEvent.get()).moreAvailable());
        if (!expectedMoreAvailable) {
            Assert.assertFalse((boolean)inputGate.pollNext().isPresent());
        }
    }

    private SingleInputGate createInputGate(NettyShuffleEnvironment environment) {
        SingleInputGate inputGate = this.createInputGate(environment, 3, ResultPartitionType.PIPELINED);
        RemoteRecoveredInputChannel remoteChannel = new InputChannelBuilder().setChannelIndex(0).buildRemoteRecoveredChannel(inputGate);
        LocalRecoveredInputChannel localChannel = new InputChannelBuilder().setChannelIndex(1).buildLocalRecoveredChannel(inputGate);
        UnknownInputChannel unknownChannel = new InputChannelBuilder().setChannelIndex(2).buildUnknownChannel(inputGate);
        inputGate.setInputChannels(new InputChannel[]{remoteChannel, localChannel, unknownChannel});
        return inputGate;
    }

    private static class TestingTaskEventPublisher
    implements TaskEventPublisher {
        private int counter = 0;

        private TestingTaskEventPublisher() {
        }

        public boolean publish(ResultPartitionID partitionId, TaskEvent event) {
            ++this.counter;
            return true;
        }
    }

    public static class TestingResultPartitionManager
    extends ResultPartitionManager {
        private int counter = 0;
        private final ResultSubpartitionView subpartitionView;

        public TestingResultPartitionManager(ResultSubpartitionView subpartitionView) {
            this.subpartitionView = subpartitionView;
        }

        public ResultSubpartitionView createSubpartitionView(ResultPartitionID partitionId, int subpartitionIndex, BufferAvailabilityListener availabilityListener) throws IOException {
            ++this.counter;
            return this.subpartitionView;
        }
    }
}

