/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.shaded.com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import scala.Tuple2;

public class RemoteInputChannelTest {
    @Test
    public void testExceptionOnReordering() throws Exception {
        SingleInputGate inputGate = (SingleInputGate)Mockito.mock(SingleInputGate.class);
        RemoteInputChannel inputChannel = this.createRemoteInputChannel(inputGate);
        inputChannel.onBuffer(TestBufferFactory.getMockBuffer(), 0);
        inputChannel.onBuffer(TestBufferFactory.getMockBuffer(), 29);
        try {
            inputChannel.getNextBuffer();
            Assert.fail((String)"Did not throw expected exception after enqueuing an out-of-order buffer.");
        }
        catch (Exception exception) {
            // empty catch block
        }
        ((SingleInputGate)Mockito.verify((Object)inputGate, (VerificationMode)Mockito.times((int)2))).onAvailableBuffer((InputChannel)Matchers.eq((Object)inputChannel));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentOnBufferAndRelease() throws Exception {
        int numberOfRepetitions = 8192;
        ExecutorService executor = Executors.newFixedThreadPool(2);
        try {
            SingleInputGate inputGate = (SingleInputGate)Mockito.mock(SingleInputGate.class);
            for (int i = 0; i < 8192; ++i) {
                final RemoteInputChannel inputChannel = this.createRemoteInputChannel(inputGate);
                Callable<Void> enqueueTask = new Callable<Void>(){

                    @Override
                    public Void call() throws Exception {
                        do {
                            for (int j = 0; j < 128; ++j) {
                                inputChannel.onBuffer(TestBufferFactory.getMockBuffer(), j);
                            }
                        } while (!inputChannel.isReleased());
                        return null;
                    }
                };
                Callable<Void> releaseTask = new Callable<Void>(){

                    @Override
                    public Void call() throws Exception {
                        inputChannel.releaseAllResources();
                        return null;
                    }
                };
                ArrayList results = Lists.newArrayListWithCapacity((int)2);
                results.add(executor.submit(enqueueTask));
                results.add(executor.submit(releaseTask));
                for (Future result : results) {
                    result.get();
                }
                Assert.assertEquals((String)"Resource leak during concurrent release and enqueue.", (long)0L, (long)inputChannel.getNumberOfQueuedBuffers());
            }
        }
        finally {
            executor.shutdown();
        }
    }

    @Test(expected=IllegalStateException.class)
    public void testRetriggerWithoutPartitionRequest() throws Exception {
        Tuple2 backoff = new Tuple2((Object)500, (Object)3000);
        PartitionRequestClient connClient = (PartitionRequestClient)Mockito.mock(PartitionRequestClient.class);
        SingleInputGate inputGate = (SingleInputGate)Mockito.mock(SingleInputGate.class);
        RemoteInputChannel ch = this.createRemoteInputChannel(inputGate, connClient, (Tuple2<Integer, Integer>)backoff);
        ch.retriggerSubpartitionRequest(0);
    }

    @Test
    public void testPartitionRequestExponentialBackoff() throws Exception {
        Tuple2 backoff = new Tuple2((Object)500, (Object)3000);
        int[] expectedDelays = new int[]{(Integer)backoff._1(), 1000, 2000, (Integer)backoff._2()};
        PartitionRequestClient connClient = (PartitionRequestClient)Mockito.mock(PartitionRequestClient.class);
        SingleInputGate inputGate = (SingleInputGate)Mockito.mock(SingleInputGate.class);
        RemoteInputChannel ch = this.createRemoteInputChannel(inputGate, connClient, (Tuple2<Integer, Integer>)backoff);
        ch.requestSubpartition(0);
        ((PartitionRequestClient)Mockito.verify((Object)connClient)).requestSubpartition((ResultPartitionID)Matchers.eq((Object)ch.partitionId), Matchers.eq((int)0), (RemoteInputChannel)Matchers.eq((Object)ch), Matchers.eq((int)0));
        for (int expected : expectedDelays) {
            ch.retriggerSubpartitionRequest(0);
            ((PartitionRequestClient)Mockito.verify((Object)connClient)).requestSubpartition((ResultPartitionID)Matchers.eq((Object)ch.partitionId), Matchers.eq((int)0), (RemoteInputChannel)Matchers.eq((Object)ch), Matchers.eq((int)expected));
        }
        try {
            ch.retriggerSubpartitionRequest(0);
            ch.getNextBuffer();
            Assert.fail((String)"Did not throw expected exception.");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testPartitionRequestSingleBackoff() throws Exception {
        Tuple2 backoff = new Tuple2((Object)500, (Object)500);
        PartitionRequestClient connClient = (PartitionRequestClient)Mockito.mock(PartitionRequestClient.class);
        SingleInputGate inputGate = (SingleInputGate)Mockito.mock(SingleInputGate.class);
        RemoteInputChannel ch = this.createRemoteInputChannel(inputGate, connClient, (Tuple2<Integer, Integer>)backoff);
        ch.requestSubpartition(0);
        ((PartitionRequestClient)Mockito.verify((Object)connClient)).requestSubpartition((ResultPartitionID)Matchers.eq((Object)ch.partitionId), Matchers.eq((int)0), (RemoteInputChannel)Matchers.eq((Object)ch), Matchers.eq((int)0));
        ch.retriggerSubpartitionRequest(0);
        ((PartitionRequestClient)Mockito.verify((Object)connClient)).requestSubpartition((ResultPartitionID)Matchers.eq((Object)ch.partitionId), Matchers.eq((int)0), (RemoteInputChannel)Matchers.eq((Object)ch), ((Integer)Matchers.eq((Object)backoff._1())).intValue());
        try {
            ch.retriggerSubpartitionRequest(0);
            ch.getNextBuffer();
            Assert.fail((String)"Did not throw expected exception.");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testPartitionRequestNoBackoff() throws Exception {
        Tuple2 backoff = new Tuple2((Object)0, (Object)0);
        PartitionRequestClient connClient = (PartitionRequestClient)Mockito.mock(PartitionRequestClient.class);
        SingleInputGate inputGate = (SingleInputGate)Mockito.mock(SingleInputGate.class);
        RemoteInputChannel ch = this.createRemoteInputChannel(inputGate, connClient, (Tuple2<Integer, Integer>)backoff);
        ch.requestSubpartition(0);
        ((PartitionRequestClient)Mockito.verify((Object)connClient)).requestSubpartition((ResultPartitionID)Matchers.eq((Object)ch.partitionId), Matchers.eq((int)0), (RemoteInputChannel)Matchers.eq((Object)ch), Matchers.eq((int)0));
        try {
            ch.retriggerSubpartitionRequest(0);
            ch.getNextBuffer();
            Assert.fail((String)"Did not throw expected exception.");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testOnFailedPartitionRequest() throws Exception {
        ConnectionManager connectionManager = (ConnectionManager)Mockito.mock(ConnectionManager.class);
        Mockito.when((Object)connectionManager.createPartitionRequestClient((ConnectionID)Matchers.any(ConnectionID.class))).thenReturn(Mockito.mock(PartitionRequestClient.class));
        ResultPartitionID partitionId = new ResultPartitionID();
        SingleInputGate inputGate = (SingleInputGate)Mockito.mock(SingleInputGate.class);
        RemoteInputChannel ch = new RemoteInputChannel(inputGate, 0, partitionId, (ConnectionID)Mockito.mock(ConnectionID.class), connectionManager);
        ch.onFailedPartitionRequest();
        ((SingleInputGate)Mockito.verify((Object)inputGate)).triggerPartitionStateCheck((ResultPartitionID)Matchers.eq((Object)partitionId));
    }

    @Test(expected=CancelTaskException.class)
    public void testProducerFailedException() throws Exception {
        ConnectionManager connManager = (ConnectionManager)Mockito.mock(ConnectionManager.class);
        Mockito.when((Object)connManager.createPartitionRequestClient((ConnectionID)Matchers.any(ConnectionID.class))).thenReturn(Mockito.mock(PartitionRequestClient.class));
        RemoteInputChannel ch = new RemoteInputChannel((SingleInputGate)Mockito.mock(SingleInputGate.class), 0, new ResultPartitionID(), (ConnectionID)Mockito.mock(ConnectionID.class), connManager);
        ch.onError((Throwable)new ProducerFailedException((Throwable)new RuntimeException("Expected test exception.")));
        ch.requestSubpartition(0);
        ch.getNextBuffer();
    }

    private RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate) throws IOException, InterruptedException {
        return this.createRemoteInputChannel(inputGate, (PartitionRequestClient)Mockito.mock(PartitionRequestClient.class), (Tuple2<Integer, Integer>)new Tuple2((Object)0, (Object)0));
    }

    private RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate, PartitionRequestClient partitionRequestClient, Tuple2<Integer, Integer> initialAndMaxRequestBackoff) throws IOException, InterruptedException {
        ConnectionManager connectionManager = (ConnectionManager)Mockito.mock(ConnectionManager.class);
        Mockito.when((Object)connectionManager.createPartitionRequestClient((ConnectionID)Matchers.any(ConnectionID.class))).thenReturn((Object)partitionRequestClient);
        return new RemoteInputChannel(inputGate, 0, new ResultPartitionID(), (ConnectionID)Mockito.mock(ConnectionID.class), connectionManager, initialAndMaxRequestBackoff);
    }
}

