/*
 * Decompiled with CFR 0.152.
 */
package org.apache.omid.tso;

import com.google.protobuf.MessageLite;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.omid.NetworkUtils;
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.omid.metrics.NullMetricsProvider;
import org.apache.omid.proto.TSOProto;
import org.apache.omid.tso.MonitoringContext;
import org.apache.omid.tso.RequestProcessor;
import org.apache.omid.tso.TSOChannelHandler;
import org.apache.omid.tso.TSOServerConfig;
import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class TestTSOChannelHandlerNetty {
    private static final Logger LOG = LoggerFactory.getLogger(TestTSOChannelHandlerNetty.class);
    @Mock
    private RequestProcessor requestProcessor;

    @BeforeMethod
    public void beforeTestMethod() {
        MockitoAnnotations.initMocks((Object)this);
    }

    private TSOChannelHandler getTSOChannelHandler(int port) {
        TSOServerConfig config = new TSOServerConfig();
        config.setPort(port);
        return new TSOChannelHandler(config, this.requestProcessor, (MetricsRegistry)new NullMetricsProvider());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=10000L)
    public void testMainAPI() throws Exception {
        int port = NetworkUtils.getFreePort();
        try (TSOChannelHandler channelHandler = this.getTSOChannelHandler(port);){
            Assert.assertNull((Object)channelHandler.listeningChannel);
            Assert.assertNull((Object)channelHandler.allChannels);
            channelHandler.reconnect();
            Assert.assertTrue((boolean)channelHandler.listeningChannel.isOpen());
            Assert.assertEquals((int)channelHandler.allChannels.size(), (int)1);
            Assert.assertEquals((int)((InetSocketAddress)channelHandler.listeningChannel.localAddress()).getPort(), (int)port);
            channelHandler.closeConnection();
            Assert.assertFalse((boolean)channelHandler.listeningChannel.isOpen());
            Assert.assertEquals((int)channelHandler.allChannels.size(), (int)0);
            channelHandler.closeConnection();
            Assert.assertFalse((boolean)channelHandler.listeningChannel.isOpen());
            Assert.assertEquals((int)channelHandler.allChannels.size(), (int)0);
            channelHandler.reconnect();
            Assert.assertTrue((boolean)channelHandler.listeningChannel.isOpen());
            Assert.assertEquals((int)channelHandler.allChannels.size(), (int)1);
            channelHandler.reconnect();
            Assert.assertTrue((boolean)channelHandler.listeningChannel.isOpen());
            Assert.assertEquals((int)channelHandler.allChannels.size(), (int)1);
            channelHandler.close();
            Assert.assertFalse((boolean)channelHandler.listeningChannel.isOpen());
            Assert.assertEquals((int)channelHandler.allChannels.size(), (int)0);
            try {
                channelHandler.reconnect();
                Assert.fail((String)"Can't reconnect after closing");
            }
            catch (Exception e) {
                Assert.assertFalse((boolean)channelHandler.listeningChannel.isOpen());
                Assert.assertEquals((int)channelHandler.allChannels.size(), (int)0);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=10000L)
    public void testNettyConnectionToTSOFromClient() throws Exception {
        int port = NetworkUtils.getFreePort();
        try (TSOChannelHandler channelHandler = this.getTSOChannelHandler(port);){
            Bootstrap nettyClient = this.createNettyClientBootstrap();
            ChannelFuture channelF = nettyClient.connect((SocketAddress)new InetSocketAddress("localhost", port));
            while (!channelF.isDone()) {
            }
            Assert.assertFalse((boolean)channelF.isSuccess());
            channelHandler.reconnect();
            Assert.assertTrue((boolean)channelHandler.listeningChannel.isOpen());
            Assert.assertEquals((int)channelHandler.allChannels.size(), (int)1);
            channelF = nettyClient.connect((SocketAddress)new InetSocketAddress("localhost", port));
            while (!channelF.isDone()) {
            }
            Assert.assertTrue((boolean)channelF.isSuccess());
            Assert.assertTrue((boolean)channelF.channel().isActive());
            while (channelHandler.allChannels.size() != 2) {
            }
            channelF.channel().close().await();
            while (channelHandler.allChannels.size() != 1) {
            }
            channelF = nettyClient.connect((SocketAddress)new InetSocketAddress("localhost", port));
            while (!channelF.isDone()) {
            }
            Assert.assertTrue((boolean)channelF.isSuccess());
            while (channelHandler.allChannels.size() != 2) {
            }
            channelHandler.closeConnection();
            Assert.assertFalse((boolean)channelHandler.listeningChannel.isOpen());
            Assert.assertEquals((int)channelHandler.allChannels.size(), (int)0);
            TimeUnit.SECONDS.sleep(1L);
            Assert.assertFalse((boolean)channelF.channel().isOpen());
            channelHandler.reconnect();
            Assert.assertTrue((boolean)channelHandler.listeningChannel.isOpen());
            Assert.assertEquals((int)channelHandler.allChannels.size(), (int)1);
            channelF = nettyClient.connect((SocketAddress)new InetSocketAddress("localhost", port));
            while (!channelF.isDone()) {
            }
            Assert.assertTrue((boolean)channelF.isSuccess());
            while (channelHandler.allChannels.size() != 2) {
            }
            channelHandler.reconnect();
            Assert.assertTrue((boolean)channelHandler.listeningChannel.isOpen());
            Assert.assertEquals((int)channelHandler.allChannels.size(), (int)1);
            TimeUnit.SECONDS.sleep(1L);
            Assert.assertFalse((boolean)channelF.channel().isOpen());
            channelHandler.close();
            Assert.assertFalse((boolean)channelHandler.listeningChannel.isOpen());
            Assert.assertEquals((int)channelHandler.allChannels.size(), (int)0);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=10000L)
    public void testNettyChannelWriting() throws Exception {
        int port = NetworkUtils.getFreePort();
        try (TSOChannelHandler channelHandler = this.getTSOChannelHandler(port);){
            channelHandler.reconnect();
            Bootstrap nettyClient = this.createNettyClientBootstrap();
            ChannelFuture channelF = nettyClient.connect((SocketAddress)new InetSocketAddress("localhost", port));
            while (!channelF.isDone()) {
            }
            Assert.assertTrue((boolean)channelF.isSuccess());
            Assert.assertTrue((boolean)channelF.channel().isActive());
            Channel channel = channelF.channel();
            while (channelHandler.allChannels.size() != 2) {
            }
            TSOProto.HandshakeRequest.Builder handshake = TSOProto.HandshakeRequest.newBuilder();
            handshake.setClientCapabilities(TSOProto.Capabilities.newBuilder().build());
            channelF.channel().writeAndFlush((Object)TSOProto.Request.newBuilder().setHandshakeRequest(handshake.build()).build());
            this.testWritingTimestampRequest(channel);
            this.testWritingCommitRequest(channel);
            this.testWritingFenceRequest(channel);
        }
    }

    private void testWritingTimestampRequest(Channel channel) throws InterruptedException {
        Mockito.reset((Object[])new RequestProcessor[]{this.requestProcessor});
        TSOProto.Request.Builder tsBuilder = TSOProto.Request.newBuilder();
        TSOProto.TimestampRequest.Builder tsRequestBuilder = TSOProto.TimestampRequest.newBuilder();
        tsBuilder.setTimestampRequest(tsRequestBuilder.build());
        channel.writeAndFlush((Object)tsBuilder.build()).await();
        ((RequestProcessor)Mockito.verify((Object)this.requestProcessor, (VerificationMode)Mockito.timeout((long)100L).times(1))).timestampRequest((Channel)ArgumentMatchers.any(), (MonitoringContext)ArgumentMatchers.any(MonitoringContext.class));
        ((RequestProcessor)Mockito.verify((Object)this.requestProcessor, (VerificationMode)Mockito.timeout((long)100L).times(0))).commitRequest(ArgumentMatchers.anyLong(), ArgumentMatchers.anyCollection(), ArgumentMatchers.anyCollection(), ArgumentMatchers.anyBoolean(), (Channel)ArgumentMatchers.any(), (MonitoringContext)ArgumentMatchers.any(MonitoringContext.class));
    }

    private void testWritingCommitRequest(Channel channel) throws InterruptedException {
        Mockito.reset((Object[])new RequestProcessor[]{this.requestProcessor});
        TSOProto.Request.Builder commitBuilder = TSOProto.Request.newBuilder();
        TSOProto.CommitRequest.Builder commitRequestBuilder = TSOProto.CommitRequest.newBuilder();
        commitRequestBuilder.setStartTimestamp(666L);
        commitRequestBuilder.addCellId(666L);
        commitBuilder.setCommitRequest(commitRequestBuilder.build());
        TSOProto.Request r = commitBuilder.build();
        Assert.assertTrue((boolean)r.hasCommitRequest());
        channel.writeAndFlush((Object)commitBuilder.build()).await();
        ((RequestProcessor)Mockito.verify((Object)this.requestProcessor, (VerificationMode)Mockito.timeout((long)100L).times(0))).timestampRequest((Channel)ArgumentMatchers.any(), (MonitoringContext)ArgumentMatchers.any(MonitoringContext.class));
        ((RequestProcessor)Mockito.verify((Object)this.requestProcessor, (VerificationMode)Mockito.timeout((long)100L).times(1))).commitRequest(ArgumentMatchers.eq((long)666L), ArgumentMatchers.anyCollection(), ArgumentMatchers.anyCollection(), ArgumentMatchers.eq((boolean)false), (Channel)ArgumentMatchers.any(), (MonitoringContext)ArgumentMatchers.any(MonitoringContext.class));
    }

    private void testWritingFenceRequest(Channel channel) throws InterruptedException {
        Mockito.reset((Object[])new RequestProcessor[]{this.requestProcessor});
        TSOProto.Request.Builder fenceBuilder = TSOProto.Request.newBuilder();
        TSOProto.FenceRequest.Builder fenceRequestBuilder = TSOProto.FenceRequest.newBuilder();
        fenceRequestBuilder.setTableId(666L);
        fenceBuilder.setFenceRequest(fenceRequestBuilder.build());
        TSOProto.Request r = fenceBuilder.build();
        Assert.assertTrue((boolean)r.hasFenceRequest());
        channel.writeAndFlush((Object)fenceBuilder.build()).await();
        ((RequestProcessor)Mockito.verify((Object)this.requestProcessor, (VerificationMode)Mockito.timeout((long)100L).times(0))).timestampRequest((Channel)ArgumentMatchers.any(), (MonitoringContext)ArgumentMatchers.any(MonitoringContext.class));
        ((RequestProcessor)Mockito.verify((Object)this.requestProcessor, (VerificationMode)Mockito.timeout((long)100L).times(1))).fenceRequest(ArgumentMatchers.eq((long)666L), (Channel)ArgumentMatchers.any(), (MonitoringContext)ArgumentMatchers.any(MonitoringContext.class));
    }

    private Bootstrap createNettyClientBootstrap() {
        ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat("tsoclient-worker-%d").build();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup(1, workerThreadFactory);
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.option(ChannelOption.TCP_NODELAY, (Object)true);
        bootstrap.option(ChannelOption.SO_KEEPALIVE, (Object)true);
        bootstrap.option(ChannelOption.SO_REUSEADDR, (Object)true);
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)100);
        bootstrap.group((EventLoopGroup)workerGroup);
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.handler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            public void initChannel(SocketChannel channel) throws Exception {
                ChannelPipeline pipeline = channel.pipeline();
                pipeline.addLast("lengthbaseddecoder", (ChannelHandler)new LengthFieldBasedFrameDecoder(8192, 0, 4, 0, 4));
                pipeline.addLast("lengthprepender", (ChannelHandler)new LengthFieldPrepender(4));
                pipeline.addLast("protobufdecoder", (ChannelHandler)new ProtobufDecoder((MessageLite)TSOProto.Response.getDefaultInstance()));
                pipeline.addLast("protobufencoder", (ChannelHandler)new ProtobufEncoder());
                pipeline.addLast("testhandler", (ChannelHandler)new ChannelInboundHandlerAdapter(){

                    public void channelActive(ChannelHandlerContext ctx) {
                        LOG.info("Channel {} active", (Object)ctx.channel());
                    }

                    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                        LOG.error("Error on channel {}", (Object)ctx.channel(), (Object)cause);
                    }
                });
            }
        });
        return bootstrap;
    }
}

