package org.apache.hadoop.hbase.ipc;

import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.class */
class SimpleRpcServerResponder extends Thread {
    private final SimpleRpcServer simpleRpcServer;
    private final Selector writeSelector;
    private final Set<SimpleServerRpcConnection> writingCons = Collections.newSetFromMap(new ConcurrentHashMap());

    /* JADX INFO: Access modifiers changed from: package-private */
    public SimpleRpcServerResponder(SimpleRpcServer simpleRpcServer) throws IOException {
        this.simpleRpcServer = simpleRpcServer;
        setName("RpcServer.responder");
        setDaemon(true);
        setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER);
        this.writeSelector = Selector.open();
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        SimpleRpcServer.LOG.debug(getName() + ": starting");
        try {
            doRunLoop();
            SimpleRpcServer.LOG.info(getName() + ": stopping");
            try {
                this.writeSelector.close();
            } catch (IOException e) {
                SimpleRpcServer.LOG.error(getName() + ": couldn't close write selector", e);
            }
        } catch (Throwable th) {
            SimpleRpcServer.LOG.info(getName() + ": stopping");
            try {
                this.writeSelector.close();
            } catch (IOException e2) {
                SimpleRpcServer.LOG.error(getName() + ": couldn't close write selector", e2);
            }
            throw th;
        }
    }

    private void registerWrites() {
        Iterator<SimpleServerRpcConnection> it = this.writingCons.iterator();
        while (it.hasNext()) {
            SimpleServerRpcConnection next = it.next();
            it.remove();
            SelectionKey keyFor = next.channel.keyFor(this.writeSelector);
            if (keyFor == null) {
                try {
                    try {
                        next.channel.register(this.writeSelector, 4, next);
                    } catch (ClosedChannelException e) {
                        if (SimpleRpcServer.LOG.isTraceEnabled()) {
                            SimpleRpcServer.LOG.trace("ignored", e);
                        }
                    }
                } catch (CancelledKeyException e2) {
                    if (SimpleRpcServer.LOG.isTraceEnabled()) {
                        SimpleRpcServer.LOG.trace("ignored", e2);
                    }
                }
            } else {
                keyFor.interestOps(4);
            }
        }
    }

    public void registerForWrite(SimpleServerRpcConnection simpleServerRpcConnection) {
        if (this.writingCons.add(simpleServerRpcConnection)) {
            this.writeSelector.wakeup();
        }
    }

    private void doRunLoop() {
        long j = 0;
        while (this.simpleRpcServer.running) {
            try {
                registerWrites();
                if (this.writeSelector.select(this.simpleRpcServer.purgeTimeout) != 0) {
                    Iterator<SelectionKey> it = this.writeSelector.selectedKeys().iterator();
                    while (it.hasNext()) {
                        SelectionKey next = it.next();
                        it.remove();
                        try {
                            if (next.isValid() && next.isWritable()) {
                                doAsyncWrite(next);
                            }
                        } catch (IOException e) {
                            SimpleRpcServer.LOG.debug(getName() + ": asyncWrite", e);
                        }
                    }
                    j = purge(j);
                }
            } catch (Exception e2) {
                SimpleRpcServer.LOG.warn(getName() + ": exception in Responder " + StringUtils.stringifyException(e2), e2);
            } catch (OutOfMemoryError e3) {
                if (this.simpleRpcServer.errorHandler == null) {
                    SimpleRpcServer.LOG.warn(getName() + ": OutOfMemoryError in server select", e3);
                    try {
                        Thread.sleep(60000L);
                    } catch (InterruptedException e4) {
                        SimpleRpcServer.LOG.debug("Interrupted while sleeping");
                        return;
                    }
                } else if (this.simpleRpcServer.errorHandler.checkOOME(e3)) {
                    SimpleRpcServer.LOG.info(getName() + ": exiting on OutOfMemoryError");
                    return;
                }
            }
        }
        SimpleRpcServer.LOG.info(getName() + ": stopped");
    }

    private long purge(long j) {
        long currentTime = EnvironmentEdgeManager.currentTime();
        if (currentTime < j + this.simpleRpcServer.purgeTimeout) {
            return j;
        }
        ArrayList arrayList = new ArrayList();
        synchronized (this.writeSelector.keys()) {
            Iterator<SelectionKey> it = this.writeSelector.keys().iterator();
            while (it.hasNext()) {
                SimpleServerRpcConnection simpleServerRpcConnection = (SimpleServerRpcConnection) it.next().attachment();
                if (simpleServerRpcConnection == null) {
                    throw new IllegalStateException("Coding error: SelectionKey key without attachment.");
                }
                if (simpleServerRpcConnection.lastSentTime > 0 && currentTime > simpleServerRpcConnection.lastSentTime + this.simpleRpcServer.purgeTimeout) {
                    arrayList.add(simpleServerRpcConnection);
                }
            }
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            this.simpleRpcServer.closeConnection((SimpleServerRpcConnection) it2.next());
        }
        return currentTime;
    }

    private void doAsyncWrite(SelectionKey selectionKey) throws IOException {
        SimpleServerRpcConnection simpleServerRpcConnection = (SimpleServerRpcConnection) selectionKey.attachment();
        if (simpleServerRpcConnection == null) {
            throw new IOException("doAsyncWrite: no connection");
        }
        if (selectionKey.channel() != simpleServerRpcConnection.channel) {
            throw new IOException("doAsyncWrite: bad channel");
        }
        if (processAllResponses(simpleServerRpcConnection)) {
            try {
                selectionKey.interestOps(0);
            } catch (CancelledKeyException e) {
                SimpleRpcServer.LOG.warn("Exception while changing ops : " + e);
            }
        }
    }

    private boolean processResponse(SimpleServerRpcConnection simpleServerRpcConnection, RpcResponse rpcResponse) throws IOException {
        BufferChain response = rpcResponse.getResponse();
        try {
            if (this.simpleRpcServer.channelWrite(simpleServerRpcConnection.channel, response) < 0) {
                throw new HBaseIOException("Error writing on the socket " + simpleServerRpcConnection);
            }
            if (0 != 0) {
                SimpleRpcServer.LOG.debug(simpleServerRpcConnection + ": output error -- closing");
                rpcResponse.done();
                this.simpleRpcServer.closeConnection(simpleServerRpcConnection);
            }
            if (response.hasRemaining()) {
                simpleServerRpcConnection.lastSentTime = EnvironmentEdgeManager.currentTime();
                return false;
            }
            rpcResponse.done();
            return true;
        } catch (Throwable th) {
            if (1 != 0) {
                SimpleRpcServer.LOG.debug(simpleServerRpcConnection + ": output error -- closing");
                rpcResponse.done();
                this.simpleRpcServer.closeConnection(simpleServerRpcConnection);
            }
            throw th;
        }
    }

    private boolean processAllResponses(SimpleServerRpcConnection simpleServerRpcConnection) throws IOException {
        simpleServerRpcConnection.responseWriteLock.lock();
        for (int i = 0; i < 20; i++) {
            try {
                RpcResponse pollFirst = simpleServerRpcConnection.responseQueue.pollFirst();
                if (pollFirst == null) {
                    return true;
                }
                if (!processResponse(simpleServerRpcConnection, pollFirst)) {
                    simpleServerRpcConnection.responseQueue.addFirst(pollFirst);
                    simpleServerRpcConnection.responseWriteLock.unlock();
                    return false;
                }
            } finally {
                simpleServerRpcConnection.responseWriteLock.unlock();
            }
        }
        simpleServerRpcConnection.responseWriteLock.unlock();
        return simpleServerRpcConnection.responseQueue.isEmpty();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void doRespond(SimpleServerRpcConnection simpleServerRpcConnection, RpcResponse rpcResponse) throws IOException {
        boolean z = false;
        if (simpleServerRpcConnection.responseQueue.isEmpty() && simpleServerRpcConnection.responseWriteLock.tryLock()) {
            try {
                if (simpleServerRpcConnection.responseQueue.isEmpty()) {
                    if (processResponse(simpleServerRpcConnection, rpcResponse)) {
                        return;
                    }
                    simpleServerRpcConnection.responseQueue.addFirst(rpcResponse);
                    z = true;
                }
                simpleServerRpcConnection.responseWriteLock.unlock();
            } finally {
                simpleServerRpcConnection.responseWriteLock.unlock();
            }
        }
        if (!z) {
            simpleServerRpcConnection.responseQueue.addLast(rpcResponse);
        }
        registerForWrite(simpleServerRpcConnection);
    }
}
