/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ratis.netty;

import java.io.Closeable;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.netty.NettyClient;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.netty.NettyUtils;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.netty.NettyProtos;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.com.google.protobuf.MessageLite;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelFuture;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandler;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelInboundHandler;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelInitializer;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelPipeline;
import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.ratis.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
import org.apache.ratis.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder;
import org.apache.ratis.thirdparty.io.netty.handler.codec.protobuf.ProtobufEncoder;
import org.apache.ratis.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import org.apache.ratis.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.PeerProxyMap;
import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.TimeDuration;

public class NettyRpcProxy
implements Closeable {
    private final RaftPeer peer;
    private final Connection connection;
    private final TimeDuration requestTimeoutDuration;

    public static long getCallId(NettyProtos.RaftNettyServerReplyProto proto) {
        switch (proto.getRaftNettyServerReplyCase()) {
            case REQUESTVOTEREPLY: {
                return proto.getRequestVoteReply().getServerReply().getCallId();
            }
            case STARTLEADERELECTIONREPLY: {
                return proto.getStartLeaderElectionReply().getServerReply().getCallId();
            }
            case APPENDENTRIESREPLY: {
                return proto.getAppendEntriesReply().getServerReply().getCallId();
            }
            case INSTALLSNAPSHOTREPLY: {
                return proto.getInstallSnapshotReply().getServerReply().getCallId();
            }
            case RAFTCLIENTREPLY: {
                return proto.getRaftClientReply().getRpcReply().getCallId();
            }
            case EXCEPTIONREPLY: {
                return proto.getExceptionReply().getRpcReply().getCallId();
            }
            case RAFTNETTYSERVERREPLY_NOT_SET: {
                throw new IllegalArgumentException("Reply case not set in proto: " + proto.getRaftNettyServerReplyCase());
            }
        }
        throw new UnsupportedOperationException("Reply case not supported: " + proto.getRaftNettyServerReplyCase());
    }

    public NettyRpcProxy(RaftPeer peer, RaftProperties properties, EventLoopGroup group) throws InterruptedException {
        this.peer = peer;
        this.connection = new Connection(group);
        this.requestTimeoutDuration = RaftClientConfigKeys.Rpc.requestTimeout((RaftProperties)properties);
    }

    @Override
    public void close() {
        this.connection.close();
    }

    public CompletableFuture<NettyProtos.RaftNettyServerReplyProto> sendAsync(NettyProtos.RaftNettyServerRequestProto proto) {
        CompletableFuture<NettyProtos.RaftNettyServerReplyProto> reply = new CompletableFuture<NettyProtos.RaftNettyServerReplyProto>();
        this.connection.offer(proto, reply);
        return reply;
    }

    public NettyProtos.RaftNettyServerReplyProto send(RaftProtos.RaftRpcRequestProto request, NettyProtos.RaftNettyServerRequestProto proto) throws IOException {
        CompletableFuture<NettyProtos.RaftNettyServerReplyProto> reply = new CompletableFuture<NettyProtos.RaftNettyServerReplyProto>();
        ChannelFuture channelFuture = this.connection.offer(proto, reply);
        try {
            channelFuture.sync();
            TimeDuration newDuration = this.requestTimeoutDuration.add(request.getTimeoutMs(), TimeUnit.MILLISECONDS);
            return reply.get(newDuration.getDuration(), newDuration.getUnit());
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw IOUtils.toInterruptedIOException((String)(ProtoUtils.toString((RaftProtos.RaftRpcRequestProto)request) + " sending from " + this.peer + " is interrupted."), (InterruptedException)e);
        }
        catch (ExecutionException e) {
            throw IOUtils.toIOException((ExecutionException)e);
        }
        catch (TimeoutException e) {
            throw new TimeoutIOException(e.getMessage(), (Throwable)e);
        }
    }

    class Connection
    implements Closeable {
        private final NettyClient client;
        private final Queue<CompletableFuture<NettyProtos.RaftNettyServerReplyProto>> replies;

        Connection(EventLoopGroup group) throws InterruptedException {
            this.client = new NettyClient(NettyRpcProxy.this.peer.getAddress());
            this.replies = new LinkedList<CompletableFuture<NettyProtos.RaftNettyServerReplyProto>>();
            SimpleChannelInboundHandler<NettyProtos.RaftNettyServerReplyProto> inboundHandler = new SimpleChannelInboundHandler<NettyProtos.RaftNettyServerReplyProto>(){

                protected void channelRead0(ChannelHandlerContext ctx, NettyProtos.RaftNettyServerReplyProto proto) {
                    CompletableFuture<NettyProtos.RaftNettyServerReplyProto> future = Connection.this.pollReply();
                    if (future == null) {
                        throw new IllegalStateException("Request #" + NettyRpcProxy.getCallId(proto) + " not found");
                    }
                    if (proto.getRaftNettyServerReplyCase() == NettyProtos.RaftNettyServerReplyProto.RaftNettyServerReplyCase.EXCEPTIONREPLY) {
                        Object ioe = ProtoUtils.toObject((ByteString)proto.getExceptionReply().getException());
                        future.completeExceptionally((IOException)ioe);
                    } else {
                        future.complete(proto);
                    }
                }
            };
            ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>((ChannelInboundHandler)inboundHandler){
                final /* synthetic */ ChannelInboundHandler val$inboundHandler;
                {
                    this.val$inboundHandler = channelInboundHandler;
                }

                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline p = ch.pipeline();
                    p.addLast(new ChannelHandler[]{new ProtobufVarint32FrameDecoder()});
                    p.addLast(new ChannelHandler[]{new ProtobufDecoder((MessageLite)NettyProtos.RaftNettyServerReplyProto.getDefaultInstance())});
                    p.addLast(new ChannelHandler[]{new ProtobufVarint32LengthFieldPrepender()});
                    p.addLast(new ChannelHandler[]{new ProtobufEncoder()});
                    p.addLast(new ChannelHandler[]{this.val$inboundHandler});
                }
            };
            this.client.connect(group, initializer);
        }

        synchronized ChannelFuture offer(NettyProtos.RaftNettyServerRequestProto request, CompletableFuture<NettyProtos.RaftNettyServerReplyProto> reply) {
            this.replies.offer(reply);
            return this.client.writeAndFlush(request);
        }

        synchronized CompletableFuture<NettyProtos.RaftNettyServerReplyProto> pollReply() {
            return this.replies.poll();
        }

        @Override
        public synchronized void close() {
            this.client.close();
            if (!this.replies.isEmpty()) {
                IOException e = new IOException("Connection to " + NettyRpcProxy.this.peer + " is closed.");
                this.replies.stream().forEach(f -> f.completeExceptionally(e));
                this.replies.clear();
            }
        }
    }

    public static class PeerMap
    extends PeerProxyMap<NettyRpcProxy> {
        private final EventLoopGroup group;

        public PeerMap(String name, RaftProperties properties) {
            this(name, properties, NettyUtils.newEventLoopGroup(name, 0, NettyConfigKeys.Client.useEpoll(properties)));
        }

        private PeerMap(String name, RaftProperties properties, EventLoopGroup group) {
            super(name, peer -> {
                try {
                    return new NettyRpcProxy((RaftPeer)peer, properties, group);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw IOUtils.toInterruptedIOException((String)("Failed connecting to " + peer), (InterruptedException)e);
                }
            });
            this.group = group;
        }

        public void close() {
            super.close();
            this.group.shutdownGracefully();
        }
    }
}

