/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler;
import org.apache.flink.runtime.io.network.netty.NettyClient;
import org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient;
import org.apache.flink.runtime.io.network.netty.NettyProtocol;
import org.apache.flink.runtime.io.network.netty.NettyTestUtil;
import org.apache.flink.runtime.io.network.netty.NeverCompletingChannelFuture;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory;
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;

public class PartitionRequestClientFactoryTest {
    private static final ResourceID RESOURCE_ID = ResourceID.generate();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testInterruptsNotCached(boolean connectionReuseEnabled) throws Exception {
        NettyTestUtil.NettyServerAndClient nettyServerAndClient = this.createNettyServerAndClient();
        try {
            AwaitingNettyClient nettyClient = new AwaitingNettyClient(nettyServerAndClient.client());
            PartitionRequestClientFactory factory = new PartitionRequestClientFactory((NettyClient)nettyClient, connectionReuseEnabled);
            nettyClient.awaitForInterrupts = true;
            this.connectAndInterrupt(factory, nettyServerAndClient.getConnectionID(RESOURCE_ID, 0));
            nettyClient.awaitForInterrupts = false;
            factory.createPartitionRequestClient(nettyServerAndClient.getConnectionID(RESOURCE_ID, 0));
        }
        finally {
            nettyServerAndClient.client().shutdown();
            nettyServerAndClient.server().shutdown();
        }
    }

    private void connectAndInterrupt(PartitionRequestClientFactory factory, ConnectionID connectionId) throws Exception {
        CompletableFuture started = new CompletableFuture();
        CompletableFuture interrupted = new CompletableFuture();
        Thread thread = new Thread(() -> {
            try {
                started.complete(null);
                factory.createPartitionRequestClient(connectionId);
            }
            catch (InterruptedException e) {
                interrupted.complete(null);
            }
            catch (Exception e) {
                interrupted.completeExceptionally(e);
            }
        });
        thread.start();
        started.get();
        thread.interrupt();
        interrupted.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testExceptionsAreNotCached(boolean connectionReuseEnabled) throws Exception {
        NettyTestUtil.NettyServerAndClient nettyServerAndClient = this.createNettyServerAndClient();
        try {
            PartitionRequestClientFactory factory = new PartitionRequestClientFactory((NettyClient)new UnstableNettyClient(nettyServerAndClient.client(), 1), connectionReuseEnabled);
            ConnectionID connectionID = nettyServerAndClient.getConnectionID(RESOURCE_ID, 0);
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> factory.createPartitionRequestClient(connectionID)).withFailMessage("Expected the first request to fail.", new Object[0])).isInstanceOf(RemoteTransportException.class);
            factory.createPartitionRequestClient(connectionID);
        }
        finally {
            nettyServerAndClient.client().shutdown();
            nettyServerAndClient.server().shutdown();
        }
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testReuseNettyPartitionRequestClient(boolean connectionReuseEnabled) throws Exception {
        NettyTestUtil.NettyServerAndClient nettyServerAndClient = this.createNettyServerAndClient();
        try {
            this.checkReuseNettyPartitionRequestClient(connectionReuseEnabled, nettyServerAndClient, 1);
            this.checkReuseNettyPartitionRequestClient(connectionReuseEnabled, nettyServerAndClient, 2);
            this.checkReuseNettyPartitionRequestClient(connectionReuseEnabled, nettyServerAndClient, 5);
            this.checkReuseNettyPartitionRequestClient(connectionReuseEnabled, nettyServerAndClient, 10);
        }
        finally {
            nettyServerAndClient.client().shutdown();
            nettyServerAndClient.server().shutdown();
        }
    }

    private void checkReuseNettyPartitionRequestClient(boolean connectionReuseEnabled, NettyTestUtil.NettyServerAndClient nettyServerAndClient, int maxNumberOfConnections) throws Exception {
        HashSet<NettyPartitionRequestClient> set = new HashSet<NettyPartitionRequestClient>();
        PartitionRequestClientFactory factory = new PartitionRequestClientFactory(nettyServerAndClient.client(), 0, maxNumberOfConnections, connectionReuseEnabled);
        for (int i = 0; i < Math.max(100, maxNumberOfConnections); ++i) {
            ConnectionID connectionID = nettyServerAndClient.getConnectionID(RESOURCE_ID, (int)(Math.random() * 2.147483647E9));
            set.add(factory.createPartitionRequestClient(connectionID));
        }
        Assertions.assertThat((int)set.size()).isLessThanOrEqualTo(maxNumberOfConnections);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testConnectionReuseWhenRemoteCloseAndNoInputChannel(boolean connectionReuseEnabled) throws Exception {
        final CompletableFuture inactiveFuture = new CompletableFuture();
        final CompletableFuture serverChannelFuture = new CompletableFuture();
        NettyProtocol protocol = new NettyProtocol(null, null){

            public ChannelHandler[] getServerChannelHandlers() {
                return new ChannelHandler[]{new ChannelInboundHandlerAdapter(){

                    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                        super.channelRegistered(ctx);
                        serverChannelFuture.complete(ctx.channel());
                    }
                }};
            }

            public ChannelHandler[] getClientChannelHandlers() {
                return new ChannelHandler[]{new ChannelInactiveFutureHandler(inactiveFuture)};
            }
        };
        NettyTestUtil.NettyServerAndClient serverAndClient = NettyTestUtil.initServerAndClient(protocol);
        PartitionRequestClientFactory factory = new PartitionRequestClientFactory(serverAndClient.client(), 2, 1, connectionReuseEnabled);
        ConnectionID connectionID = serverAndClient.getConnectionID(ResourceID.generate(), 0);
        NettyPartitionRequestClient oldClient = factory.createPartitionRequestClient(connectionID);
        Channel channel = (Channel)serverChannelFuture.get();
        channel.close();
        inactiveFuture.get();
        NettyPartitionRequestClient newClient = factory.createPartitionRequestClient(connectionID);
        ((ObjectAssert)Assertions.assertThat((Object)newClient).as("Factory should create a new client.", new Object[0])).isNotSameAs((Object)oldClient);
        NettyTestUtil.shutdown(serverAndClient);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testNettyClientConnectRetry(boolean connectionReuseEnabled) throws Exception {
        NettyTestUtil.NettyServerAndClient serverAndClient = this.createNettyServerAndClient();
        UnstableNettyClient unstableNettyClient = new UnstableNettyClient(serverAndClient.client(), 2);
        PartitionRequestClientFactory factory = new PartitionRequestClientFactory((NettyClient)unstableNettyClient, 2, 1, connectionReuseEnabled);
        factory.createPartitionRequestClient(serverAndClient.getConnectionID(RESOURCE_ID, 0));
        serverAndClient.client().shutdown();
        serverAndClient.server().shutdown();
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testFailureReportedToSubsequentRequests(boolean connectionReuseEnabled) throws Exception {
        PartitionRequestClientFactory factory = new PartitionRequestClientFactory((NettyClient)new FailingNettyClient(), 2, 1, connectionReuseEnabled);
        Assertions.assertThatThrownBy(() -> factory.createPartitionRequestClient(new ConnectionID(ResourceID.generate(), new InetSocketAddress(InetAddress.getLocalHost(), 8080), 0)));
        Assertions.assertThatThrownBy(() -> factory.createPartitionRequestClient(new ConnectionID(ResourceID.generate(), new InetSocketAddress(InetAddress.getLocalHost(), 8080), 0))).isInstanceOf(IOException.class);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testNettyClientConnectRetryFailure(boolean connectionReuseEnabled) throws Exception {
        NettyTestUtil.NettyServerAndClient serverAndClient = this.createNettyServerAndClient();
        UnstableNettyClient unstableNettyClient = new UnstableNettyClient(serverAndClient.client(), 3);
        try {
            PartitionRequestClientFactory factory = new PartitionRequestClientFactory((NettyClient)unstableNettyClient, 2, 1, connectionReuseEnabled);
            Assertions.assertThatThrownBy(() -> factory.createPartitionRequestClient(serverAndClient.getConnectionID(RESOURCE_ID, 0))).isInstanceOf(IOException.class);
        }
        finally {
            serverAndClient.client().shutdown();
            serverAndClient.server().shutdown();
        }
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testNettyClientConnectRetryMultipleThread(boolean connectionReuseEnabled) throws Exception {
        NettyTestUtil.NettyServerAndClient serverAndClient = this.createNettyServerAndClient();
        UnstableNettyClient unstableNettyClient = new UnstableNettyClient(serverAndClient.client(), 2);
        PartitionRequestClientFactory factory = new PartitionRequestClientFactory((NettyClient)unstableNettyClient, 2, 1, connectionReuseEnabled);
        ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(10);
        ArrayList<Future<NettyPartitionRequestClient>> futures = new ArrayList<Future<NettyPartitionRequestClient>>();
        for (int i = 0; i < 10; ++i) {
            Future<NettyPartitionRequestClient> future = threadPoolExecutor.submit(() -> {
                NettyPartitionRequestClient client = null;
                try {
                    client = factory.createPartitionRequestClient(serverAndClient.getConnectionID(RESOURCE_ID, 0));
                }
                catch (Exception e) {
                    Assertions.fail((String)e.getMessage());
                }
                return client;
            });
            futures.add(future);
        }
        futures.forEach(runnableFuture -> {
            try {
                NettyPartitionRequestClient client = (NettyPartitionRequestClient)runnableFuture.get();
                Assertions.assertThat((Object)client).isNotNull();
            }
            catch (Exception e) {
                System.out.println(e.getMessage());
                Assertions.fail((String)e.getMessage());
            }
        });
        threadPoolExecutor.shutdown();
        serverAndClient.client().shutdown();
        serverAndClient.server().shutdown();
    }

    private NettyTestUtil.NettyServerAndClient createNettyServerAndClient() throws Exception {
        return NettyTestUtil.initServerAndClient(new NettyProtocol(null, null){

            public ChannelHandler[] getServerChannelHandlers() {
                return new ChannelHandler[10];
            }

            public ChannelHandler[] getClientChannelHandlers() {
                return new ChannelHandler[]{(ChannelHandler)Mockito.mock(NetworkClientHandler.class)};
            }
        });
    }

    private static class ChannelInactiveFutureHandler
    extends CreditBasedPartitionRequestClientHandler {
        private final CompletableFuture<Void> inactiveFuture;

        private ChannelInactiveFutureHandler(CompletableFuture<Void> inactiveFuture) {
            this.inactiveFuture = inactiveFuture;
        }

        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            super.channelInactive(ctx);
            this.inactiveFuture.complete(null);
        }

        public CompletableFuture<Void> getInactiveFuture() {
            return this.inactiveFuture;
        }
    }

    private static class AwaitingNettyClient
    extends NettyClient {
        private volatile boolean awaitForInterrupts;
        private final NettyClient client;

        AwaitingNettyClient(NettyClient client) {
            super(null);
            this.client = client;
        }

        ChannelFuture connect(InetSocketAddress serverSocketAddress) {
            if (this.awaitForInterrupts) {
                return new NeverCompletingChannelFuture();
            }
            try {
                return this.client.connect(serverSocketAddress);
            }
            catch (Exception exception) {
                throw new RuntimeException(exception);
            }
        }
    }

    private static class FailingNettyClient
    extends NettyClient {
        FailingNettyClient() {
            super(null);
        }

        ChannelFuture connect(InetSocketAddress serverSocketAddress) {
            throw new ChannelException("Simulate connect failure");
        }
    }

    private static class UnstableNettyClient
    extends NettyClient {
        private final NettyClient nettyClient;
        private int retry;

        UnstableNettyClient(NettyClient nettyClient, int retry) {
            super(null);
            this.nettyClient = nettyClient;
            this.retry = retry;
        }

        ChannelFuture connect(InetSocketAddress serverSocketAddress) {
            if (this.retry > 0) {
                --this.retry;
                throw new ChannelException("Simulate connect failure");
            }
            return this.nettyClient.connect(serverSocketAddress);
        }
    }
}

