/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.ipc;

import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayDeque;
import java.util.Locale;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import javax.security.sasl.SaslException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
import org.apache.hadoop.hbase.ipc.Call;
import org.apache.hadoop.hbase.ipc.CallCancelledException;
import org.apache.hadoop.hbase.ipc.ConnectionId;
import org.apache.hadoop.hbase.ipc.FailedServerException;
import org.apache.hadoop.hbase.ipc.FatalConnectionException;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.IPCUtil;
import org.apache.hadoop.hbase.ipc.RpcConnection;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.SocketInputWrapper;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted;
import org.apache.htrace.core.TraceScope;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
class BlockingRpcConnection
extends RpcConnection
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(BlockingRpcConnection.class);
    private final BlockingRpcClient rpcClient;
    private final String threadName;
    @SuppressWarnings(value={"IS2_INCONSISTENT_SYNC"}, justification="We are always under lock actually")
    private Thread thread;
    protected Socket socket = null;
    private DataInputStream in;
    private DataOutputStream out;
    private HBaseSaslRpcClient saslRpcClient;
    private final ConcurrentMap<Integer, Call> calls = new ConcurrentHashMap<Integer, Call>();
    private final CallSender callSender;
    private boolean closed = false;
    private byte[] connectionHeaderPreamble;
    private byte[] connectionHeaderWithLength;
    private boolean waitingConnectionHeaderResponse = false;

    BlockingRpcConnection(BlockingRpcClient rpcClient, ConnectionId remoteId) throws IOException {
        super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId, rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor);
        this.rpcClient = rpcClient;
        if (remoteId.getAddress().isUnresolved()) {
            throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName());
        }
        this.connectionHeaderPreamble = this.getConnectionHeaderPreamble();
        RPCProtos.ConnectionHeader header = this.getConnectionHeader();
        ByteArrayOutputStream baos = new ByteArrayOutputStream(4 + header.getSerializedSize());
        DataOutputStream dos = new DataOutputStream(baos);
        dos.writeInt(header.getSerializedSize());
        header.writeTo(dos);
        assert (baos.size() == 4 + header.getSerializedSize());
        this.connectionHeaderWithLength = baos.getBuffer();
        UserGroupInformation ticket = remoteId.ticket.getUGI();
        this.threadName = "IPC Client (" + this.rpcClient.socketFactory.hashCode() + ") connection to " + remoteId.getAddress().toString() + (ticket == null ? " from an unknown user" : " from " + ticket.getUserName());
        if (this.rpcClient.conf.getBoolean("hbase.ipc.client.specificThreadForWriting", false)) {
            this.callSender = new CallSender(this.threadName, this.rpcClient.conf);
            this.callSender.start();
        } else {
            this.callSender = null;
        }
    }

    protected void setupConnection() throws IOException {
        int ioFailures = 0;
        int timeoutFailures = 0;
        while (true) {
            try {
                this.socket = this.rpcClient.socketFactory.createSocket();
                this.socket.setTcpNoDelay(this.rpcClient.isTcpNoDelay());
                this.socket.setKeepAlive(this.rpcClient.tcpKeepAlive);
                if (this.rpcClient.localAddr != null) {
                    this.socket.bind(this.rpcClient.localAddr);
                }
                NetUtils.connect(this.socket, this.remoteId.getAddress(), this.rpcClient.connectTO);
                this.socket.setSoTimeout(this.rpcClient.readTO);
                return;
            }
            catch (SocketTimeoutException toe) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Received exception in connection setup.\n" + StringUtils.stringifyException(toe));
                }
                int n = timeoutFailures;
                timeoutFailures = (short)(timeoutFailures + 1);
                this.handleConnectionFailure(n, this.rpcClient.maxRetries, toe);
                continue;
            }
            catch (IOException ie) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Received exception in connection setup.\n" + StringUtils.stringifyException(ie));
                }
                int n = ioFailures;
                ioFailures = (short)(ioFailures + 1);
                this.handleConnectionFailure(n, this.rpcClient.maxRetries, ie);
                continue;
            }
            break;
        }
    }

    private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe) throws IOException {
        this.closeSocket();
        if (curRetries >= maxRetries || ExceptionUtil.isInterrupt(ioe)) {
            throw ioe;
        }
        try {
            Thread.sleep(this.rpcClient.failureSleep);
        }
        catch (InterruptedException ie) {
            ExceptionUtil.rethrowIfInterrupt(ie);
        }
        if (LOG.isInfoEnabled()) {
            LOG.info("Retrying connect to server: " + this.remoteId.getAddress() + " after sleeping " + this.rpcClient.failureSleep + "ms. Already tried " + curRetries + " time(s).");
        }
    }

    private synchronized boolean waitForWork() {
        long waitUntil = EnvironmentEdgeManager.currentTime() + (long)this.rpcClient.minIdleTimeBeforeClose;
        while (this.thread != null) {
            if (!this.calls.isEmpty()) {
                return true;
            }
            if (EnvironmentEdgeManager.currentTime() >= waitUntil) {
                this.closeConn(new IOException("idle connection closed with " + this.calls.size() + " pending request(s)"));
                return false;
            }
            try {
                this.wait(Math.min(this.rpcClient.minIdleTimeBeforeClose, 1000));
            }
            catch (InterruptedException interruptedException) {
            }
        }
        return false;
    }

    @Override
    public void run() {
        if (LOG.isTraceEnabled()) {
            LOG.trace(this.threadName + ": starting, connections " + this.rpcClient.connections.size());
        }
        while (this.waitForWork()) {
            this.readResponse();
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace(this.threadName + ": stopped, connections " + this.rpcClient.connections.size());
        }
    }

    private void disposeSasl() {
        if (this.saslRpcClient != null) {
            this.saslRpcClient.dispose();
            this.saslRpcClient = null;
        }
    }

    private boolean setupSaslConnection(InputStream in2, OutputStream out2) throws IOException {
        this.saslRpcClient = new HBaseSaslRpcClient(this.authMethod, this.token, this.serverPrincipal, this.rpcClient.fallbackAllowed, this.rpcClient.conf.get("hbase.rpc.protection", SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)), this.rpcClient.conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT));
        return this.saslRpcClient.saslConnect(in2, out2);
    }

    private void handleSaslConnectionFailure(final int currRetries, final int maxRetries, final Exception ex, UserGroupInformation user) throws IOException, InterruptedException {
        this.closeSocket();
        user.doAs(new PrivilegedExceptionAction<Object>(){

            @Override
            public Object run() throws IOException, InterruptedException {
                if (BlockingRpcConnection.this.shouldAuthenticateOverKrb()) {
                    if (currRetries < maxRetries) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Exception encountered while connecting to the server : " + StringUtils.stringifyException(ex));
                        }
                        BlockingRpcConnection.this.relogin();
                        BlockingRpcConnection.this.disposeSasl();
                        Thread.sleep(ThreadLocalRandom.current().nextInt(BlockingRpcConnection.this.reloginMaxBackoff) + 1);
                        return null;
                    }
                    String msg = "Couldn't setup connection for " + UserGroupInformation.getLoginUser().getUserName() + " to " + BlockingRpcConnection.this.serverPrincipal;
                    LOG.warn(msg, (Throwable)ex);
                    throw (IOException)new IOException(msg).initCause(ex);
                }
                LOG.warn("Exception encountered while connecting to the server : " + ex);
                if (ex instanceof RemoteException) {
                    throw (RemoteException)ex;
                }
                if (ex instanceof SaslException) {
                    String msg = "SASL authentication failed. The most likely cause is missing or invalid credentials. Consider 'kinit'.";
                    LOG.error(HBaseMarkers.FATAL, msg, (Throwable)ex);
                    throw new RuntimeException(msg, ex);
                }
                throw new IOException(ex);
            }
        });
    }

    private void setupIOstreams() throws IOException {
        if (this.socket != null) {
            return;
        }
        if (this.rpcClient.failedServers.isFailedServer(this.remoteId.getAddress())) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Not trying to connect to " + this.remoteId.address + " this server is in the failed servers list");
            }
            throw new FailedServerException("This server is in the failed servers list: " + this.remoteId.address);
        }
        try {
            OutputStream outStream;
            InputStream inStream;
            block12: {
                boolean continueSasl;
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Connecting to " + this.remoteId.address);
                }
                int numRetries = 0;
                int MAX_RETRIES = 5;
                while (true) {
                    this.setupConnection();
                    inStream = NetUtils.getInputStream(this.socket);
                    outStream = NetUtils.getOutputStream(this.socket, this.rpcClient.writeTO);
                    this.writeConnectionHeaderPreamble(outStream);
                    if (!this.useSasl) break block12;
                    final SocketInputWrapper in2 = inStream;
                    final OutputStream out2 = outStream;
                    UserGroupInformation ticket = this.getUGI();
                    if (ticket == null) {
                        throw new FatalConnectionException("ticket/user is null");
                    }
                    try {
                        continueSasl = ticket.doAs(new PrivilegedExceptionAction<Boolean>(){

                            @Override
                            public Boolean run() throws IOException {
                                return BlockingRpcConnection.this.setupSaslConnection(in2, out2);
                            }
                        });
                    }
                    catch (Exception ex) {
                        ExceptionUtil.rethrowIfInterrupt(ex);
                        int n = numRetries;
                        numRetries = (short)(numRetries + 1);
                        this.handleSaslConnectionFailure(n, 5, ex, ticket);
                        continue;
                    }
                    break;
                }
                if (continueSasl) {
                    inStream = this.saslRpcClient.getInputStream();
                    outStream = this.saslRpcClient.getOutputStream();
                }
            }
            this.in = new DataInputStream(new BufferedInputStream(inStream));
            this.out = new DataOutputStream(new BufferedOutputStream(outStream));
            this.writeConnectionHeader();
            this.processResponseForConnectionHeader();
        }
        catch (Throwable t) {
            this.closeSocket();
            IOException e = ExceptionUtil.asInterrupt(t);
            if (e == null) {
                this.rpcClient.failedServers.addToFailedServers(this.remoteId.address, t);
                e = t instanceof LinkageError ? new DoNotRetryIOException(t) : (t instanceof IOException ? (IOException)t : new IOException("Could not set up IO Streams to " + this.remoteId.address, t));
            }
            throw e;
        }
        this.thread = new Thread((Runnable)this, this.threadName);
        this.thread.setDaemon(true);
        this.thread.start();
    }

    private void writeConnectionHeaderPreamble(OutputStream out) throws IOException {
        out.write(this.connectionHeaderPreamble);
        out.flush();
    }

    private void writeConnectionHeader() throws IOException {
        boolean isCryptoAesEnable = false;
        if (this.saslRpcClient != null) {
            boolean saslEncryptionEnabled = SaslUtil.QualityOfProtection.PRIVACY.getSaslQop().equalsIgnoreCase(this.saslRpcClient.getSaslQOP());
            boolean bl = isCryptoAesEnable = saslEncryptionEnabled && this.conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT);
        }
        if (isCryptoAesEnable) {
            this.waitingConnectionHeaderResponse = true;
        }
        this.out.write(this.connectionHeaderWithLength);
        this.out.flush();
    }

    private void processResponseForConnectionHeader() throws IOException {
        if (!this.waitingConnectionHeaderResponse) {
            return;
        }
        try {
            RPCProtos.ConnectionHeaderResponse connectionHeaderResponse;
            int len = this.in.readInt();
            byte[] buff = new byte[len];
            int readSize = this.in.read(buff);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Length of response for connection header:" + readSize);
            }
            if ((connectionHeaderResponse = RPCProtos.ConnectionHeaderResponse.parseFrom(buff)).hasCryptoCipherMeta()) {
                this.negotiateCryptoAes(connectionHeaderResponse.getCryptoCipherMeta());
            }
            this.waitingConnectionHeaderResponse = false;
        }
        catch (SocketTimeoutException ste) {
            LOG.error(HBaseMarkers.FATAL, "Can't get the connection header response for rpc timeout, please check if server has the correct configuration to support the additional function.", (Throwable)ste);
            throw new IOException("Timeout while waiting connection header response", ste);
        }
    }

    private void negotiateCryptoAes(RPCProtos.CryptoCipherMeta cryptoCipherMeta) throws IOException {
        this.saslRpcClient.initCryptoCipher(cryptoCipherMeta, this.rpcClient.conf);
        this.in = new DataInputStream(new BufferedInputStream(this.saslRpcClient.getInputStream()));
        this.out = new DataOutputStream(new BufferedOutputStream(this.saslRpcClient.getOutputStream()));
    }

    private void tracedWriteRequest(Call call) throws IOException {
        try (TraceScope ignored = TraceUtil.createTrace("RpcClientImpl.tracedWriteRequest", call.span);){
            this.writeRequest(call);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writeRequest(Call call) throws IOException {
        ReferenceCounted cellBlock = null;
        try {
            cellBlock = this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec, this.compressor, call.cells, PooledByteBufAllocator.DEFAULT);
            RPCProtos.CellBlockMeta cellBlockMeta = cellBlock != null ? RPCProtos.CellBlockMeta.newBuilder().setLength(((ByteBuf)cellBlock).readableBytes()).build() : null;
            RPCProtos.RequestHeader requestHeader = IPCUtil.buildRequestHeader(call, cellBlockMeta);
            this.setupIOstreams();
            if (Thread.interrupted()) {
                throw new InterruptedIOException();
            }
            this.calls.put(call.id, call);
            try {
                call.callStats.setRequestSizeBytes(IPCUtil.write(this.out, requestHeader, call.param, (ByteBuf)cellBlock));
            }
            catch (Throwable t) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Error while writing call, call_id:" + call.id, t);
                }
                IOException e = IPCUtil.toIOE(t);
                this.closeConn(e);
                if (cellBlock != null) {
                    cellBlock.release();
                }
                return;
            }
        }
        finally {
            if (cellBlock != null) {
                cellBlock.release();
            }
        }
        this.notifyAll();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void readResponse() {
        block16: {
            Call call = null;
            boolean expectedCall = false;
            try {
                int totalSize = this.in.readInt();
                RPCProtos.ResponseHeader responseHeader = RPCProtos.ResponseHeader.parseDelimitedFrom(this.in);
                int id = responseHeader.getCallId();
                call = (Call)this.calls.remove(id);
                boolean bl = expectedCall = call != null && !call.isDone();
                if (!expectedCall) {
                    int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
                    int whatIsLeftToRead = totalSize - readSoFar;
                    IOUtils.skipFully(this.in, whatIsLeftToRead);
                    if (call != null) {
                        call.callStats.setResponseSizeBytes(totalSize);
                        call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
                    }
                    return;
                }
                if (responseHeader.hasException()) {
                    RPCProtos.ExceptionResponse exceptionResponse = responseHeader.getException();
                    RemoteException re = IPCUtil.createRemoteException(exceptionResponse);
                    call.setException(re);
                    call.callStats.setResponseSizeBytes(totalSize);
                    call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
                    if (!IPCUtil.isFatalConnectionException(exceptionResponse)) break block16;
                    BlockingRpcConnection blockingRpcConnection = this;
                    synchronized (blockingRpcConnection) {
                        this.closeConn(re);
                        break block16;
                    }
                }
                Message value = null;
                if (call.responseDefaultType != null) {
                    Message.Builder builder = call.responseDefaultType.newBuilderForType();
                    ProtobufUtil.mergeDelimitedFrom(builder, this.in);
                    value = builder.build();
                }
                CellScanner cellBlockScanner = null;
                if (responseHeader.hasCellBlockMeta()) {
                    int size = responseHeader.getCellBlockMeta().getLength();
                    byte[] cellBlock = new byte[size];
                    IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length);
                    cellBlockScanner = this.rpcClient.cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock);
                }
                call.setResponse(value, cellBlockScanner);
                call.callStats.setResponseSizeBytes(totalSize);
                call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
            }
            catch (IOException e) {
                if (expectedCall) {
                    call.setException(e);
                }
                if (e instanceof SocketTimeoutException) {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("ignored", (Throwable)e);
                    }
                }
                BlockingRpcConnection blockingRpcConnection = this;
                synchronized (blockingRpcConnection) {
                    this.closeConn(e);
                }
            }
        }
    }

    @Override
    protected synchronized void callTimeout(Call call) {
        this.calls.remove(call.id);
    }

    private void closeSocket() {
        IOUtils.closeStream(this.out);
        IOUtils.closeStream(this.in);
        IOUtils.closeSocket(this.socket);
        this.out = null;
        this.in = null;
        this.socket = null;
    }

    private void closeConn(IOException e) {
        if (this.thread == null) {
            return;
        }
        this.thread.interrupt();
        this.thread = null;
        this.closeSocket();
        if (this.callSender != null) {
            this.callSender.cleanup(e);
        }
        for (Call call : this.calls.values()) {
            call.setException(e);
        }
        this.calls.clear();
    }

    @Override
    public synchronized void shutdown() {
        this.closed = true;
        if (this.callSender != null) {
            this.callSender.interrupt();
        }
        this.closeConn(new IOException("connection to " + this.remoteId.address + " closed"));
    }

    @Override
    public void cleanupConnection() {
    }

    @Override
    public synchronized void sendRequest(final Call call, HBaseRpcController pcrc) throws IOException {
        pcrc.notifyOnCancel(new RpcCallback<Object>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run(Object parameter) {
                IPCUtil.setCancelled(call);
                BlockingRpcConnection blockingRpcConnection = BlockingRpcConnection.this;
                synchronized (blockingRpcConnection) {
                    if (BlockingRpcConnection.this.callSender != null) {
                        BlockingRpcConnection.this.callSender.remove(call);
                    } else {
                        BlockingRpcConnection.this.calls.remove(call.id);
                    }
                }
            }
        }, new HBaseRpcController.CancellationCallback(){

            @Override
            public void run(boolean cancelled) throws IOException {
                if (cancelled) {
                    IPCUtil.setCancelled(call);
                    return;
                }
                BlockingRpcConnection.this.scheduleTimeoutTask(call);
                if (BlockingRpcConnection.this.callSender != null) {
                    BlockingRpcConnection.this.callSender.sendCall(call);
                } else {
                    BlockingRpcConnection.this.tracedWriteRequest(call);
                }
            }
        });
    }

    @Override
    public synchronized boolean isActive() {
        return this.thread != null;
    }

    private class CallSender
    extends Thread {
        private final Queue<Call> callsToWrite;
        private final int maxQueueSize;

        public CallSender(String name, Configuration conf) {
            int queueSize = conf.getInt("hbase.ipc.client.write.queueSize", 1000);
            this.callsToWrite = new ArrayDeque<Call>(queueSize);
            this.maxQueueSize = queueSize;
            this.setDaemon(true);
            this.setName(name + " - writer");
        }

        public void sendCall(Call call) throws IOException {
            if (this.callsToWrite.size() >= this.maxQueueSize) {
                throw new IOException("Can't add the call " + call.id + " to the write queue. callsToWrite.size()=" + this.callsToWrite.size());
            }
            this.callsToWrite.offer(call);
            BlockingRpcConnection.this.notifyAll();
        }

        public void remove(Call call) {
            this.callsToWrite.remove(call);
            BlockingRpcConnection.this.calls.remove(call.id);
            call.setException(new CallCancelledException("Call id=" + call.id + ", waitTime=" + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimeout=" + call.timeout));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public void run() {
            BlockingRpcConnection blockingRpcConnection = BlockingRpcConnection.this;
            synchronized (blockingRpcConnection) {
                while (!BlockingRpcConnection.this.closed) {
                    if (this.callsToWrite.isEmpty()) {
                        try {
                            BlockingRpcConnection.this.wait();
                        }
                        catch (InterruptedException interruptedException) {}
                        continue;
                    }
                    Call call = this.callsToWrite.poll();
                    if (call.isDone()) continue;
                    try {
                        BlockingRpcConnection.this.tracedWriteRequest(call);
                    }
                    catch (IOException e) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("call write error for call #" + call.id, (Throwable)e);
                        }
                        call.setException(e);
                        BlockingRpcConnection.this.closeConn(e);
                    }
                }
                return;
            }
        }

        public void cleanup(IOException e) {
            ConnectionClosingException ie = new ConnectionClosingException("Connection to " + BlockingRpcConnection.this.remoteId.address + " is closing.");
            for (Call call : this.callsToWrite) {
                call.setException(ie);
            }
            this.callsToWrite.clear();
        }
    }
}

