/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.MemorySegmentProvider;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler;
import org.apache.flink.runtime.io.network.netty.NettyClient;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
import org.apache.flink.util.NetUtils;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class NettyPartitionRequestClientTest {
    @Parameterized.Parameter
    public boolean connectionReuseEnabled;

    @Parameterized.Parameters(name="connection reuse enabled = {0}")
    public static Object[] parameters() {
        return new Object[][]{{true}, {false}};
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPartitionRequestClientReuse() throws Exception {
        CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{handler});
        NettyPartitionRequestClient client = this.createPartitionRequestClient((Channel)channel, (NetworkClientHandler)handler, true);
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1, (MemorySegmentProvider)networkBufferPool);
        RemoteInputChannel inputChannel = InputChannelTestUtils.createRemoteInputChannel(inputGate, (PartitionRequestClient)client);
        try {
            client.close(inputChannel);
            Assert.assertFalse((boolean)client.canBeDisposed());
            handler.notifyAllChannelsOfErrorAndClose((Throwable)new RuntimeException());
            Assert.assertTrue((boolean)client.canBeDisposed());
        }
        finally {
            inputGate.close();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRetriggerPartitionRequest() throws Exception {
        long deadline = System.currentTimeMillis() + 30000L;
        CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{handler});
        NettyPartitionRequestClient client = this.createPartitionRequestClient((Channel)channel, (NetworkClientHandler)handler, this.connectionReuseEnabled);
        int numExclusiveBuffers = 2;
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1, (MemorySegmentProvider)networkBufferPool);
        RemoteInputChannel inputChannel = InputChannelBuilder.newBuilder().setConnectionManager(InputChannelTestUtils.mockConnectionManagerWithPartitionRequestClient((PartitionRequestClient)client)).setInitialBackoff(1).setMaxBackoff(2).buildRemoteChannel(inputGate);
        try {
            inputGate.setInputChannels(new InputChannel[]{inputChannel});
            BufferPool bufferPool = networkBufferPool.createBufferPool(6, 6);
            inputGate.setBufferPool(bufferPool);
            inputGate.setupChannels();
            inputChannel.requestSubpartition();
            Assert.assertTrue((boolean)channel.isWritable());
            Object readFromOutbound = channel.readOutbound();
            Assert.assertThat((Object)readFromOutbound, (Matcher)Matchers.instanceOf(NettyMessage.PartitionRequest.class));
            Assert.assertEquals((Object)inputChannel.getInputChannelId(), (Object)((NettyMessage.PartitionRequest)readFromOutbound).receiverId);
            Assert.assertEquals((long)2L, (long)((NettyMessage.PartitionRequest)readFromOutbound).credit);
            inputGate.retriggerPartitionRequest(inputChannel.getPartitionId().getPartitionId(), inputChannel.getConsumedSubpartitionIndex());
            this.runAllScheduledPendingTasks(channel, deadline);
            readFromOutbound = channel.readOutbound();
            Assert.assertThat((Object)readFromOutbound, (Matcher)Matchers.instanceOf(NettyMessage.PartitionRequest.class));
            Assert.assertEquals((Object)inputChannel.getInputChannelId(), (Object)((NettyMessage.PartitionRequest)readFromOutbound).receiverId);
            Assert.assertEquals((long)2L, (long)((NettyMessage.PartitionRequest)readFromOutbound).credit);
            inputGate.retriggerPartitionRequest(inputChannel.getPartitionId().getPartitionId(), inputChannel.getConsumedSubpartitionIndex());
            this.runAllScheduledPendingTasks(channel, deadline);
            readFromOutbound = channel.readOutbound();
            Assert.assertThat((Object)readFromOutbound, (Matcher)Matchers.instanceOf(NettyMessage.PartitionRequest.class));
            Assert.assertEquals((Object)inputChannel.getInputChannelId(), (Object)((NettyMessage.PartitionRequest)readFromOutbound).receiverId);
            Assert.assertEquals((long)2L, (long)((NettyMessage.PartitionRequest)readFromOutbound).credit);
            Assert.assertNull((Object)channel.readOutbound());
        }
        finally {
            inputGate.close();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDoublePartitionRequest() throws Exception {
        CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{handler});
        NettyPartitionRequestClient client = this.createPartitionRequestClient((Channel)channel, (NetworkClientHandler)handler, this.connectionReuseEnabled);
        int numExclusiveBuffers = 2;
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1, (MemorySegmentProvider)networkBufferPool);
        RemoteInputChannel inputChannel = InputChannelTestUtils.createRemoteInputChannel(inputGate, (PartitionRequestClient)client);
        try {
            inputGate.setInputChannels(new InputChannel[]{inputChannel});
            BufferPool bufferPool = networkBufferPool.createBufferPool(6, 6);
            inputGate.setBufferPool(bufferPool);
            inputGate.setupChannels();
            inputChannel.requestSubpartition();
            Assert.assertTrue((boolean)channel.isWritable());
            Object readFromOutbound = channel.readOutbound();
            Assert.assertThat((Object)readFromOutbound, (Matcher)Matchers.instanceOf(NettyMessage.PartitionRequest.class));
            Assert.assertEquals((Object)inputChannel.getInputChannelId(), (Object)((NettyMessage.PartitionRequest)readFromOutbound).receiverId);
            Assert.assertEquals((long)2L, (long)((NettyMessage.PartitionRequest)readFromOutbound).credit);
            Assert.assertNull((Object)channel.readOutbound());
        }
        finally {
            inputGate.close();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testResumeConsumption() throws Exception {
        CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{handler});
        NettyPartitionRequestClient client = this.createPartitionRequestClient((Channel)channel, (NetworkClientHandler)handler, this.connectionReuseEnabled);
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1, (MemorySegmentProvider)networkBufferPool);
        RemoteInputChannel inputChannel = InputChannelTestUtils.createRemoteInputChannel(inputGate, (PartitionRequestClient)client);
        try {
            BufferPool bufferPool = networkBufferPool.createBufferPool(6, 6);
            inputGate.setBufferPool(bufferPool);
            inputGate.setupChannels();
            inputChannel.requestSubpartition();
            inputChannel.resumeConsumption();
            channel.runPendingTasks();
            Object readFromOutbound = channel.readOutbound();
            Assert.assertThat((Object)readFromOutbound, (Matcher)Matchers.instanceOf(NettyMessage.PartitionRequest.class));
            readFromOutbound = channel.readOutbound();
            Assert.assertThat((Object)readFromOutbound, (Matcher)Matchers.instanceOf(NettyMessage.ResumeConsumption.class));
            Assert.assertEquals((Object)inputChannel.getInputChannelId(), (Object)((NettyMessage.ResumeConsumption)readFromOutbound).receiverId);
            Assert.assertNull((Object)channel.readOutbound());
        }
        finally {
            inputGate.close();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAcknowledgeAllRecordsProcessed() throws Exception {
        CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{handler});
        NettyPartitionRequestClient client = this.createPartitionRequestClient((Channel)channel, (NetworkClientHandler)handler, this.connectionReuseEnabled);
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1, (MemorySegmentProvider)networkBufferPool);
        RemoteInputChannel inputChannel = InputChannelTestUtils.createRemoteInputChannel(inputGate, (PartitionRequestClient)client);
        try {
            BufferPool bufferPool = networkBufferPool.createBufferPool(6, 6);
            inputGate.setBufferPool(bufferPool);
            inputGate.setupChannels();
            inputChannel.requestSubpartition();
            inputChannel.acknowledgeAllRecordsProcessed();
            channel.runPendingTasks();
            Object readFromOutbound = channel.readOutbound();
            Assert.assertThat((Object)readFromOutbound, (Matcher)Matchers.instanceOf(NettyMessage.PartitionRequest.class));
            readFromOutbound = channel.readOutbound();
            Assert.assertThat((Object)readFromOutbound, (Matcher)Matchers.instanceOf(NettyMessage.AckAllUserRecordsProcessed.class));
            Assert.assertEquals((Object)inputChannel.getInputChannelId(), (Object)((NettyMessage.AckAllUserRecordsProcessed)readFromOutbound).receiverId);
            Assert.assertNull((Object)channel.readOutbound());
        }
        finally {
            inputGate.close();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
        }
    }

    private NettyPartitionRequestClient createPartitionRequestClient(Channel tcpChannel, NetworkClientHandler clientHandler, boolean connectionReuseEnabled) throws Exception {
        try (NetUtils.Port availablePort = NetUtils.getAvailablePort();){
            int port = availablePort.getPort();
            ConnectionID connectionID = new ConnectionID(ResourceID.generate(), new InetSocketAddress("localhost", port), 0);
            NettyConfig config = new NettyConfig(InetAddress.getLocalHost(), port, 1024, 1, new Configuration());
            NettyClient nettyClient = new NettyClient(config);
            PartitionRequestClientFactory partitionRequestClientFactory = new PartitionRequestClientFactory(nettyClient, connectionReuseEnabled);
            NettyPartitionRequestClient nettyPartitionRequestClient = new NettyPartitionRequestClient(tcpChannel, clientHandler, connectionID, partitionRequestClientFactory);
            return nettyPartitionRequestClient;
        }
    }

    void runAllScheduledPendingTasks(EmbeddedChannel channel, long deadline) throws InterruptedException {
        while (channel.runScheduledPendingTasks() != -1L && System.currentTimeMillis() < deadline) {
            Thread.sleep(1L);
        }
    }
}

