/*
 * Decompiled with CFR 0.152.
 */
package org.voltdb.client;

import com.google_voltpatches.common.net.HostAndPort;
import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.ssl.SslContext;
import java.io.IOException;
import java.net.ConnectException;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;
import javax.security.auth.Subject;
import org.json_voltpatches.JSONException;
import org.json_voltpatches.JSONObject;
import org.voltcore.network.CipherExecutor;
import org.voltcore.network.Connection;
import org.voltcore.network.QueueMonitor;
import org.voltcore.network.ReverseDNSCache;
import org.voltcore.network.VoltNetworkPool;
import org.voltcore.network.VoltProtocolHandler;
import org.voltcore.network.metrics.IOStatsData;
import org.voltcore.utils.EstTime;
import org.voltdb.ClientResponseImpl;
import org.voltdb.VoltTable;
import org.voltdb.client.Client2;
import org.voltdb.client.Client2CallOptions;
import org.voltdb.client.Client2Config;
import org.voltdb.client.Client2Notification;
import org.voltdb.client.ClientAffinityStats;
import org.voltdb.client.ClientAuthScheme;
import org.voltdb.client.ClientConnectionRequestStats;
import org.voltdb.client.ClientFactory;
import org.voltdb.client.ClientIOStats;
import org.voltdb.client.ClientResponse;
import org.voltdb.client.ClientResponseWithPartitionKey;
import org.voltdb.client.ClientSslSetup;
import org.voltdb.client.ClientStats;
import org.voltdb.client.ClientStatsContext;
import org.voltdb.client.ConnectionUtil;
import org.voltdb.client.GeneralException;
import org.voltdb.client.HashinatorLite;
import org.voltdb.client.NoConnectionsException;
import org.voltdb.client.ProcCallException;
import org.voltdb.client.ProcedureInvocation;
import org.voltdb.client.RateLimiter2;
import org.voltdb.client.RequestLimitException;
import org.voltdb.client.VoltBulkLoader.BulkLoaderFailureCallBack;
import org.voltdb.client.VoltBulkLoader.BulkLoaderState;
import org.voltdb.client.VoltBulkLoader.BulkLoaderSuccessCallback;
import org.voltdb.client.VoltBulkLoader.VoltBulkLoader;
import org.voltdb.utils.Encoder;

public class Client2Impl
implements Client2 {
    private final AtomicLong handleGenerator = new AtomicLong(0L);
    private final AtomicLong sysHandleGenerator = new AtomicLong(0L);
    private static final boolean debugging = false;
    private static final boolean datadump = false;
    private final String username;
    private final byte[] passwordHash;
    private final ClientAuthScheme hashScheme;
    private final Subject subject;
    private final SslContext sslContext;
    private final boolean sslHostCheck;
    private CipherExecutor cipherService;
    private final Random randomizer = new Random();
    static final int DEFAULT_REQUEST_PRIORITY = 4;
    static final int DEFAULT_SYSREQ_PRIORITY = 2;
    private int defaultRequestPriority = 4;
    private int systemRequestPriority = 2;
    static final long DEFAULT_CONNECTION_SETUP_TIMEOUT = TimeUnit.SECONDS.toNanos(30L);
    static final long DEFAULT_CONNECTION_RESPONSE_TIMEOUT = TimeUnit.MINUTES.toNanos(2L);
    static final long DEFAULT_PROCEDURE_TIMEOUT = TimeUnit.MINUTES.toNanos(2L);
    private long connectTimeout = DEFAULT_CONNECTION_SETUP_TIMEOUT;
    private long connectionResponseTimeout = DEFAULT_CONNECTION_RESPONSE_TIMEOUT;
    private long procedureCallTimeout = DEFAULT_PROCEDURE_TIMEOUT;
    private static final long ONE_SECOND_NANOS = TimeUnit.SECONDS.toNanos(1L);
    private final AtomicReference<HashinatorLite> hashinator = new AtomicReference();
    private final Object hashinatorReady = new Object();
    private static final Map<Integer, ClientConnection> noPartitionLeaders = Collections.emptyMap();
    private final AtomicReference<Map<Integer, ClientConnection>> partitionLeaders = new AtomicReference<Map<Integer, ClientConnection>>(noPartitionLeaders);
    private final Map<Integer, ClientAffinityStats> clientAffinityStats = new HashMap<Integer, ClientAffinityStats>();
    private final Map<Integer, ClientConnectionRequestStats> connectionStats = new HashMap<Integer, ClientConnectionRequestStats>();
    private static final Map<Integer, Integer> noPartitionKeys = Collections.emptyMap();
    private final AtomicReference<Map<Integer, Integer>> partitionKeys = new AtomicReference<Map<Integer, Integer>>(noPartitionKeys);
    private final AtomicLong partitionKeysTimestamp = new AtomicLong(0L);
    private final List<Consumer<Throwable>> partitionKeysWaiters = new ArrayList<Consumer<Throwable>>();
    private final AtomicBoolean partitionKeysUpdateInProgress = new AtomicBoolean(false);
    private static final long DEFAULT_PARTITION_KEYS_CACHE_REFRESH = TimeUnit.SECONDS.toNanos(1L);
    private long partitionKeysCacheRefresh = DEFAULT_PARTITION_KEYS_CACHE_REFRESH;
    private static final Map<String, ProcInfo> noProcInfo = Collections.emptyMap();
    private final AtomicReference<Map<String, ProcInfo>> procInfoMap = new AtomicReference<Map<String, ProcInfo>>(noProcInfo);
    private final Map<Long, RequestContext> requestMap = new ConcurrentHashMap<Long, RequestContext>();
    static final int DEFAULT_REQUEST_HARD_LIMIT = 1000;
    static final int DEFAULT_REQUEST_WARNING_LEVEL = 800;
    static final int DEFAULT_REQUEST_RESUME_LEVEL = 200;
    private int requestHardLimit = 1000;
    private int requestWarningLevel = 800;
    private int requestResumeLevel = 200;
    private final Object requestBackpressureLock = new Object();
    private volatile boolean requestBackpressureOn;
    static final int DEFAULT_TXN_OUT_LIMIT = 100;
    private final Semaphore sendPermits = new Semaphore(100);
    private int outLimit = 100;
    static final int DEFAULT_BACKPRESSURE_QUEUE_LIMIT = 100;
    private int backpressureQueueLimit = 100;
    private static final PrioOrder priorityOrder = new PrioOrder();
    private static final boolean USE_PRIORITY_QUEUE = true;
    private static final int INITIAL_QUEUE_CAPACITY = 1000;
    private final List<ClientConnection> connectionList = new CopyOnWriteArrayList<ClientConnection>();
    private final Map<Integer, ClientConnection> hostIdToConnection = new HashMap<Integer, ClientConnection>();
    private final Object connectionLock = new Object();
    private volatile int nextConnection = -1;
    private volatile String infoTablePortKey;
    private VoltNetworkPool networkPool;
    private final ThreadGroup workerGroup = new ThreadGroup("Client2-ConnectionWorkers");
    private final ConcurrentHashMap.KeySetView<Long, Boolean> activeHandles = ConcurrentHashMap.newKeySet();
    private final Set<HostInfo> connectHistory = new HashSet<HostInfo>();
    private long clusterTimestamp;
    private int clusterLeader;
    private String clusterBuildString;
    private ClientConnection subscribedConnection;
    private final AtomicBoolean subscriptionTaskPending = new AtomicBoolean(false);
    private static final long DEFAULT_RESUBSCRIPTION_DELAY = TimeUnit.SECONDS.toNanos(5L);
    private static final long RESUBSCRIPTION_FAILURE_DELAY = TimeUnit.SECONDS.toNanos(120L);
    private long resubscriptionDelay = DEFAULT_RESUBSCRIPTION_DELAY;
    private long resubscriptionFailureDelay = RESUBSCRIPTION_FAILURE_DELAY;
    private final AtomicBoolean topoRefreshTaskPending = new AtomicBoolean(false);
    private static final long DEFAULT_TOPO_REFRESH_DELAY = TimeUnit.SECONDS.toNanos(1L);
    private static final long TOPO_REFRESH_FAILURE_DELAY = TimeUnit.SECONDS.toNanos(120L);
    private long topoRefreshDelay = DEFAULT_TOPO_REFRESH_DELAY;
    private long topoRefreshFailureDelay = TOPO_REFRESH_FAILURE_DELAY;
    private final AtomicBoolean connectionTaskPending = new AtomicBoolean(false);
    static final long DEFAULT_RECONNECT_DELAY = TimeUnit.SECONDS.toNanos(1L);
    static final long DEFAULT_RECONNECT_RETRY_DELAY = TimeUnit.SECONDS.toNanos(15L);
    private long reconnectDelay = DEFAULT_RECONNECT_DELAY;
    private long reconnectRetryDelay = DEFAULT_RECONNECT_RETRY_DELAY;
    private boolean autoConnectionMgmt = true;
    private ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor(r -> Client2Impl.newDaemonThread(r, "Client2-Timer"));
    private ScheduledExecutorService execService = Executors.newSingleThreadScheduledExecutor(r -> Client2Impl.newDaemonThread(r, "Client2-Exec"));
    static final int DEFAULT_RESPONSE_THREADS = 4;
    private int responseThreadCount = 4;
    private ExecutorService responseService;
    private boolean stopResponseServiceAtShutdown;
    private final RateLimiter2 rateLimiter;
    private final Client2Notification.ConnectionStatus notificationConnectionUp;
    private final Client2Notification.ConnectionStatus notificationConnectionDown;
    private final Client2Notification.ConnectionStatus notificationConnectFailure;
    private final Client2Notification.LateResponse notificationLateResponse;
    private final Client2Notification.RequestBackpressure notificationRequestBackpressure;
    private Client2Notification.ErrorLog errorLog = this::printError;
    private volatile boolean isShutdown;
    private BulkLoaderState bulkState;

    private static BlockingQueue<RequestContext> allocateQueue() {
        return new PriorityBlockingQueue<RequestContext>(1000, priorityOrder);
    }

    public Client2Impl(Client2Config config) {
        this.subject = config.subject;
        this.username = config.username != null ? config.username : "";
        this.hashScheme = config.hashScheme;
        if (config.cleartext) {
            String passwd = config.password != null ? config.password : "";
            this.passwordHash = ConnectionUtil.getHashedPassword(this.hashScheme, passwd);
        } else {
            this.passwordHash = Encoder.hexDecode(config.password);
        }
        if (config.enableSsl) {
            this.sslContext = ClientSslSetup.createClientSslContext(config.sslConfig);
            this.sslHostCheck = config.enableSslHostCheck;
            this.cipherService = CipherExecutor.CLIENT;
            this.cipherService.startup();
        } else {
            this.sslContext = null;
            this.sslHostCheck = false;
            this.cipherService = null;
        }
        this.rateLimiter = config.txnPerSecRateLimit > 0 ? new RateLimiter2(config.txnPerSecRateLimit) : null;
        this.notificationConnectionUp = config.notificationConnectionUp;
        this.notificationConnectionDown = config.notificationConnectionDown;
        this.notificationConnectFailure = config.notificationConnectFailure;
        this.notificationLateResponse = config.notificationLateResponse;
        this.notificationRequestBackpressure = config.notificationRequestBackpressure;
        if (config.notificationErrorLog != null) {
            this.errorLog = config.notificationErrorLog;
        }
        if (config.responseExecutorService != null) {
            this.responseService = config.responseExecutorService;
            this.stopResponseServiceAtShutdown = config.stopResponseServiceOnClose;
        } else {
            AtomicInteger respThreadNum = new AtomicInteger();
            this.responseThreadCount = config.responseThreadCount;
            this.responseService = Executors.newFixedThreadPool(this.responseThreadCount, r -> Client2Impl.newDaemonThread(r, "Client2-Response-" + respThreadNum.incrementAndGet()));
            this.stopResponseServiceAtShutdown = true;
        }
        this.networkPool = new VoltNetworkPool(1, 1, "Client2");
        this.networkPool.start();
        this.defaultRequestPriority = config.requestPriority;
        this.connectTimeout = config.connectionSetupTimeout;
        this.procedureCallTimeout = config.procedureCallTimeout;
        this.connectionResponseTimeout = config.connectionResponseTimeout;
        this.setOutstandingTxnLimit(config.outstandingTxnLimit);
        this.setRequestLimits(config.requestHardLimit, config.requestWarningLevel, config.requestResumeLevel);
        this.backpressureQueueLimit = config.networkBackpressureLevel;
        this.timerService.scheduleAtFixedRate(new TimeoutTask(), 1L, 1L, TimeUnit.SECONDS);
        this.reconnectDelay = config.reconnectDelay;
        this.reconnectRetryDelay = config.reconnectRetryDelay;
        this.autoConnectionMgmt = !config.disableConnectionMgmt;
    }

    @Override
    public void setRequestLimits(int limit, int warning, int resume) {
        this.requestHardLimit = Math.max(limit, 1);
        this.requestWarningLevel = Math.min(Math.max(warning, 1), this.requestHardLimit);
        this.requestResumeLevel = Math.min(Math.max(resume, 0), this.requestWarningLevel);
    }

    @Override
    public int currentRequestCount() {
        return this.requestMap.size();
    }

    @Override
    public int setOutstandingTxnLimit(int limit) {
        int newLimit = Math.max(limit, 1);
        int delta = newLimit - this.outLimit;
        if (delta > 0) {
            this.sendPermits.release(delta);
        } else if (delta < 0) {
            int drained = this.sendPermits.drainPermits();
            if (-delta < drained) {
                this.sendPermits.release(drained + delta);
            } else if (-delta > drained) {
                newLimit = this.outLimit - drained;
            }
        }
        this.outLimit = newLimit;
        return newLimit;
    }

    @Override
    public int outstandingTxnCount() {
        return this.outLimit - this.sendPermits.availablePermits();
    }

    @Override
    public void connectSync(String servers, long timeout, long delay, TimeUnit unit) throws IOException {
        List<HostInfo> hostList = this.hostInfoList(servers);
        this.toSyncReturn(this.doConnect(hostList, timeout, delay, unit));
    }

    @Override
    public void connectSync(String servers) throws IOException {
        this.connectSync(servers, 0L, 0L, TimeUnit.NANOSECONDS);
    }

    @Override
    public void connectSync(String host, int port, long timeout, long delay, TimeUnit unit) throws IOException {
        List<HostInfo> hostList = this.hostInfoList(host, port);
        this.toSyncReturn(this.doConnect(hostList, timeout, delay, unit));
    }

    @Override
    public void connectSync(String host, int port) throws IOException {
        this.connectSync(host, port, 0L, 0L, TimeUnit.NANOSECONDS);
    }

    @Override
    public CompletableFuture<Void> connectAsync(String servers, long timeout, long delay, TimeUnit unit) {
        List<HostInfo> hostList = this.hostInfoList(servers);
        return this.doConnect(hostList, timeout, delay, unit);
    }

    @Override
    public CompletableFuture<Void> connectAsync(String servers) {
        return this.connectAsync(servers, 0L, 0L, TimeUnit.NANOSECONDS);
    }

    @Override
    public CompletableFuture<Void> connectAsync(String host, int port, long timeout, long delay, TimeUnit unit) {
        List<HostInfo> hostList = this.hostInfoList(host, port);
        return this.doConnect(hostList, timeout, delay, unit);
    }

    @Override
    public CompletableFuture<Void> connectAsync(String host, int port) {
        return this.connectAsync(host, port, 0L, 0L, TimeUnit.NANOSECONDS);
    }

    @Override
    public List<InetSocketAddress> connectedHosts() {
        ArrayList<InetSocketAddress> list = new ArrayList<InetSocketAddress>();
        for (ClientConnection cxn : this.connectionList) {
            list.add(cxn.connection.getRemoteSocketAddress());
        }
        return list;
    }

    @Override
    public String clusterBuildString() {
        return this.clusterBuildString;
    }

    @Override
    public Object[] clusterInstanceId() {
        return new Object[]{this.clusterTimestamp, this.clusterLeader};
    }

    @Override
    public CompletableFuture<ClientResponse> callProcedureAsync(String procName, Object ... parameters) {
        return this.doProcCall(this.procedureCallTimeout, -1L, -1, this.defaultRequestPriority, procName, parameters);
    }

    @Override
    public ClientResponse callProcedureSync(String procName, Object ... parameters) throws IOException, ProcCallException {
        return this.toSyncProcCall(this.callProcedureAsync(procName, parameters));
    }

    @Override
    public CompletableFuture<ClientResponse> callProcedureAsync(Client2CallOptions options, String procName, Object ... parameters) {
        long clientTmo = this.procedureCallTimeout;
        long queryTmo = -1L;
        int reqPrio = this.defaultRequestPriority;
        if (options != null) {
            if (options.clientTimeout != null) {
                clientTmo = options.clientTimeout;
            }
            if (options.queryTimeout != null) {
                queryTmo = options.queryTimeout;
            }
            if (options.requestPriority != null) {
                reqPrio = options.requestPriority;
            }
        }
        return this.doProcCall(clientTmo, queryTmo, -1, reqPrio, procName, parameters);
    }

    @Override
    public ClientResponse callProcedureSync(Client2CallOptions options, String procName, Object ... parameters) throws IOException, ProcCallException {
        return this.toSyncProcCall(this.callProcedureAsync(options, procName, parameters));
    }

    @Override
    public CompletableFuture<ClientResponseWithPartitionKey[]> callAllPartitionProcedureAsync(Client2CallOptions options, String procName, Object ... parameters) {
        long clientTmo = this.procedureCallTimeout;
        long queryTmo = -1L;
        int reqPrio = this.defaultRequestPriority;
        if (options != null) {
            if (options.clientTimeout != null) {
                clientTmo = options.clientTimeout;
            }
            if (options.queryTimeout != null) {
                queryTmo = options.queryTimeout;
            }
            if (options.requestPriority != null) {
                reqPrio = options.requestPriority;
            }
        }
        return this.doAllPartitionCall(clientTmo, queryTmo, reqPrio, procName, parameters);
    }

    @Override
    public ClientResponseWithPartitionKey[] callAllPartitionProcedureSync(Client2CallOptions options, String procName, Object ... parameters) throws IOException {
        return this.toSyncAllPartCall(this.callAllPartitionProcedureAsync(options, procName, parameters));
    }

    @Override
    public void drain() throws InterruptedException {
        this.doDrainRequests();
    }

    @Override
    public void close() {
        try {
            this.doClientShutdown();
            ClientFactory.decreaseClientNum();
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    @Override
    public ClientStatsContext createStatsContext() {
        return new ClientStatsContext(this, this.getStatsSnapshot(), this.getIOStatsSnapshot(), this.getAffinityStatsSnapshot());
    }

    private CompletableFuture<Void> doConnect(List<HostInfo> servers, long timeout, long delay, TimeUnit unit) {
        long tmoNs = timeout > 0L ? unit.toNanos(timeout) : 0L;
        long delayNs = delay > 0L ? unit.toNanos(delay) : 0L;
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.execService.schedule(new UserConnectionTask(servers, tmoNs, delayNs, future), 0L, TimeUnit.NANOSECONDS);
        return future;
    }

    private List<HostInfo> hostInfoList(String servers) {
        ArrayList<HostInfo> list = new ArrayList<HostInfo>();
        for (String srv : servers.split(",")) {
            if ((srv = srv.trim()).isEmpty()) continue;
            list.add(HostInfo.fromString(srv, 21212));
        }
        if (list.isEmpty()) {
            throw new IllegalArgumentException("Empty server list");
        }
        return list;
    }

    private List<HostInfo> hostInfoList(String host, int port) {
        ArrayList<HostInfo> list = new ArrayList<HostInfo>(1);
        list.add(HostInfo.fromParts(host, port));
        return list;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void createConnection(HostInfo server) throws IOException {
        String buildStr;
        int leaderAddr;
        long timestamp;
        int hostId;
        String host = server.getHost();
        int port = server.getPort();
        if (this.isShutdown) {
            throw new IllegalStateException("shutting down");
        }
        if (server.unresolvedHostName()) {
            ClientConnection tempCxn = new ClientConnection(server);
            this.notifyConnectFailure(tempCxn);
            String msg = String.format("Unknown host name or malformed address: %s", host);
            throw new UnknownHostException(msg);
        }
        SSLEngine sslEngine = null;
        if (this.sslContext != null) {
            sslEngine = this.sslContext.newEngine(ByteBufAllocator.DEFAULT, host, port);
            if (this.sslHostCheck) {
                SSLParameters parms = sslEngine.getSSLParameters();
                parms.setEndpointIdentificationAlgorithm("HTTPS");
                sslEngine.setSSLParameters(parms);
            }
        }
        ClientConnection cxn = new ClientConnection(server);
        SocketChannel channel = null;
        try {
            Object[] connectResults = ConnectionUtil.getAuthenticatedConnection(host, this.username, this.passwordHash, port, this.subject, this.hashScheme, sslEngine, TimeUnit.NANOSECONDS.toMillis(this.connectTimeout));
            channel = (SocketChannel)connectResults[0];
            long[] instanceId = (long[])connectResults[1];
            hostId = (int)instanceId[0];
            timestamp = instanceId[2];
            leaderAddr = (int)instanceId[3];
            buildStr = (String)connectResults[2];
            Connection c = this.networkPool.registerChannel(channel, cxn, this.cipherService, sslEngine, EstTime::currentTimeMillis);
            c.writeStream().setPendingWriteBackpressureThreshold(this.backpressureQueueLimit);
            cxn.setConnection(c, hostId);
            ClientConnectionRequestStats stats = new ClientConnectionRequestStats(hostId, host);
            cxn.setConnectionRequestStats(stats);
        }
        catch (IOException | RuntimeException ex) {
            this.closeChannel(channel);
            this.notifyConnectFailure(cxn);
            throw ex;
        }
        IOException fail = null;
        ClientConnection prevCxn = null;
        Object object = this.connectionLock;
        synchronized (object) {
            if (this.connectionList.size() == 0 || this.clusterTimestamp == 0L) {
                this.clusterTimestamp = timestamp;
                this.clusterLeader = leaderAddr;
                this.clusterBuildString = buildStr;
                this.connectHistory.clear();
            } else if (this.clusterTimestamp != timestamp || this.clusterLeader != leaderAddr) {
                String msg = String.format("Cluster instance id mismatch: current is %d,%d, server's is %d,%d", this.clusterTimestamp, this.clusterLeader, timestamp, leaderAddr);
                fail = new IOException(msg);
            }
            if (fail == null) {
                prevCxn = this.hostIdToConnection.put(hostId, cxn);
                this.addConnectionStats(cxn.connectionRequestStats);
                this.connectHistory.add(server);
                this.connectionList.add(cxn);
                cxn.start();
                if (!this.ensureSubscription(0L)) {
                    this.refreshTopology(this.topoRefreshDelay);
                }
            }
        }
        if (prevCxn != null) {
            this.logError("Warning: replaced connection for host id %d (%s)", hostId, prevCxn.getServer());
            prevCxn.getConnection().unregister();
        }
        if (fail != null) {
            cxn.getConnection().unregister();
            this.notifyConnectFailure(cxn);
            throw fail;
        }
        this.notifyConnectionUp(cxn);
    }

    private void closeChannel(SocketChannel channel) {
        try {
            if (channel != null) {
                channel.close();
            }
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ClientConnection getConnectionForHost(int id) {
        Object object = this.connectionLock;
        synchronized (object) {
            return this.hostIdToConnection.get(id);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeConnection(ClientConnection cxn) {
        this.notifyConnectionDown(cxn);
        Iterator<RequestContext> iterator = this.connectionLock;
        synchronized (iterator) {
            this.connectionList.remove(cxn);
            if (this.hostIdToConnection.get(cxn.hostId) == cxn) {
                this.hostIdToConnection.remove(cxn.hostId);
            }
            Iterator<Map.Entry<Integer, ClientConnection>> it = this.partitionLeaders.get().entrySet().iterator();
            while (it.hasNext()) {
                if (it.next().getValue() != cxn) continue;
                it.remove();
            }
            if (this.connectionList.isEmpty()) {
                this.subscribedConnection = null;
                this.scheduleFirstConnection(this.connectHistory, this.reconnectDelay);
            } else if (this.subscribedConnection == cxn) {
                this.subscribedConnection = null;
                this.ensureSubscription(this.resubscriptionDelay);
            }
        }
        for (RequestContext req : this.requestMap.values()) {
            if (req.cxn != cxn) continue;
            this.completeRequestOnHostDown(req);
        }
    }

    private void notifyConnectFailure(ClientConnection cxn) {
        if (this.notificationConnectFailure != null) {
            String host = cxn.getServer().getHost();
            int port = cxn.getServer().getPort();
            this.debug("Connect failed: %s port %d", host, port);
            this.notifyConnectionEvent(this.notificationConnectFailure, host, port);
        }
    }

    private void notifyConnectionUp(ClientConnection cxn) {
        if (this.notificationConnectionUp != null) {
            String host = cxn.getServer().getHost();
            int port = cxn.getServer().getPort();
            this.debug("Connection up: %s port %d", host, port);
            this.notifyConnectionEvent(this.notificationConnectionUp, host, port);
        }
    }

    private void notifyConnectionDown(ClientConnection cxn) {
        if (this.notificationConnectionDown != null) {
            String host = cxn.getServer().getHost();
            int port = cxn.getServer().getPort();
            this.debug("Connection down: %s port %d", host, port);
            this.notifyConnectionEvent(this.notificationConnectionDown, host, port);
        }
    }

    private void notifyConnectionEvent(Client2Notification.ConnectionStatus notification, String host, int port) {
        if (notification != null) {
            try {
                notification.accept(host, port);
            }
            catch (Exception ex) {
                this.logError("Unhandled exception from notification handler: " + ex, new Object[0]);
            }
        }
    }

    private CompletableFuture<ClientResponse> doProcCall(long clientTimeout, long queryTimeout, int destinationPartition, int requestPrio, String procName, Object ... params) {
        int queryTmoMs;
        CompletableFuture<ClientResponse> future = new CompletableFuture<ClientResponse>();
        if (this.isShutdown) {
            future.completeExceptionally(new IllegalStateException("shutting down"));
            return future;
        }
        if (procName == null || procName.isEmpty()) {
            future.completeExceptionally(new IllegalArgumentException("Procedure name required"));
            return future;
        }
        if (requestPrio < 1 || requestPrio > 8) {
            String err = String.format("Invalid request priority %d; range is %d to %d", requestPrio, 1, 8);
            future.completeExceptionally(new IllegalArgumentException(err));
            return future;
        }
        int requestCount = this.requestMap.size();
        if (requestCount >= this.requestHardLimit) {
            String msg = String.format("In-progress request limit %d exceeded", this.requestHardLimit);
            future.completeExceptionally(new RequestLimitException(msg));
            return future;
        }
        long handle = this.handleGenerator.incrementAndGet();
        ProcedureInvocation invocation = new ProcedureInvocation(handle, queryTmoMs = (int)(queryTimeout > 0L ? TimeUnit.NANOSECONDS.toMillis(queryTimeout) : queryTimeout), destinationPartition, requestPrio, procName, params);
        ClientConnection cxn = this.findConnection(invocation);
        if (cxn == null) {
            String msg = "No connections to cluster at this time";
            future.completeExceptionally(new NoConnectionsException(msg));
            return future;
        }
        RequestContext reqCtx = new RequestContext(future, invocation, clientTimeout, cxn);
        this.requestMap.put(handle, reqCtx);
        if (requestCount + 1 >= this.requestWarningLevel && !this.requestBackpressureOn) {
            this.reportRequestBackpressure(true);
        }
        if (this.rateLimiter != null) {
            this.rateLimiter.limitSendRate();
        }
        cxn.enqueue(reqCtx);
        this.incrementInvocation(handle, cxn);
        return future;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void reportRequestBackpressure(boolean slowdown) {
        if (this.notificationRequestBackpressure != null) {
            boolean trigger;
            int count = this.requestMap.size();
            boolean bl = slowdown ? count >= this.requestWarningLevel : (trigger = count <= this.requestResumeLevel);
            if (trigger) {
                Object object = this.requestBackpressureLock;
                synchronized (object) {
                    if (slowdown ^ this.requestBackpressureOn) {
                        this.requestBackpressureOn = slowdown;
                        try {
                            this.notificationRequestBackpressure.accept(slowdown);
                        }
                        catch (Exception ex) {
                            this.logError("Unhandled exception from notification handler: " + ex, new Object[0]);
                        }
                    }
                }
            }
        }
    }

    private CompletableFuture<ClientResponseWithPartitionKey[]> doAllPartitionCall(long clientTimeout, long queryTimeout, int requestPrio, String procName, Object ... params) {
        AllPartitionCallContext context = new AllPartitionCallContext(clientTimeout, queryTimeout, requestPrio, procName, params);
        if (this.isShutdown) {
            context.future.completeExceptionally(new IllegalStateException("shutting down"));
        } else {
            this.refreshPartitionKeys(th -> {
                if (th != null) {
                    context.future.completeExceptionally((Throwable)th);
                } else {
                    this.doAllPartitionCall(context);
                }
            });
        }
        return context.future;
    }

    private void doAllPartitionCall(AllPartitionCallContext context) {
        Object[] args = new Object[context.params.length + 1];
        System.arraycopy(context.params, 0, args, 1, context.params.length);
        Map<Integer, Integer> idToKey = this.partitionKeys.get();
        ClientResponseWithPartitionKey[] responses = new ClientResponseWithPartitionKey[idToKey.size()];
        AtomicInteger count = new AtomicInteger(idToKey.size());
        int index = 0;
        for (Map.Entry<Integer, Integer> ent : idToKey.entrySet()) {
            Integer partitionId = ent.getKey();
            Integer partitionKey = ent.getValue();
            args[0] = partitionKey;
            int thisIndex = index++;
            this.doProcCall(context.clientTimeout, context.queryTimeout, partitionId, context.requestPrio, context.procName, args).whenComplete((resp, th) -> this.onePartitionComplete(context.future, responses, thisIndex, count, partitionKey, partitionId, (ClientResponse)resp, (Throwable)th));
        }
    }

    private void onePartitionComplete(CompletableFuture<ClientResponseWithPartitionKey[]> future, ClientResponseWithPartitionKey[] respArray, int index, AtomicInteger count, Integer key, int partitionId, ClientResponse resp, Throwable th) {
        if (resp == null) {
            String err = null;
            if (th != null) {
                err = th.getMessage();
            }
            if (err == null) {
                err = "unspecified error";
            }
            resp = new ClientResponseImpl(-3, new VoltTable[0], err);
        }
        respArray[index] = new ClientResponseWithPartitionKey(key, resp, partitionId);
        if (count.decrementAndGet() == 0) {
            future.complete(respArray);
        }
    }

    private void doDrainRequests() throws InterruptedException {
        int sleep = 500000;
        int incSleep = 500000;
        long maxSleep = 5000000L;
        while (!this.requestMap.isEmpty()) {
            LockSupport.parkNanos(sleep);
            if (Thread.interrupted()) {
                throw new InterruptedException("Interrupted in drain");
            }
            if ((long)sleep >= 5000000L) continue;
            sleep += 500000;
        }
    }

    private void doDrainTasks() throws InterruptedException {
        int sleep = 500000;
        int incSleep = 500000;
        long maxSleep = 5000000L;
        while (this.subscriptionTaskPending.get() || this.topoRefreshTaskPending.get() || this.connectionTaskPending.get()) {
            LockSupport.parkNanos(sleep);
            if (!Thread.interrupted()) continue;
            throw new InterruptedException("Interrupted in drain");
        }
    }

    public void doClientShutdown() {
        this.isShutdown = true;
        try {
            this.doDrainTasks();
            this.doDrainRequests();
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        if (this.timerService != null) {
            this.stopService(this.timerService);
            this.timerService = null;
        }
        if (this.execService != null) {
            this.stopService(this.execService);
            this.execService = null;
        }
        if (this.networkPool != null) {
            try {
                this.networkPool.shutdown();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            this.networkPool = null;
        }
        if (this.responseService != null) {
            if (this.stopResponseServiceAtShutdown) {
                this.stopService(this.responseService);
            }
            this.responseService = null;
        }
        this.hashinator.set(null);
        this.partitionLeaders.set(noPartitionLeaders);
        this.partitionKeys.set(noPartitionKeys);
        this.clientAffinityStats.clear();
        this.procInfoMap.set(noProcInfo);
        if (this.cipherService != null) {
            this.cipherService.shutdown();
            this.cipherService = null;
        }
        this.activeHandles.clear();
        this.requestMap.clear();
        this.connectionList.clear();
        this.hostIdToConnection.clear();
        this.connectHistory.clear();
        this.subscribedConnection = null;
    }

    private void stopService(ExecutorService service) {
        service.shutdown();
        try {
            if (!service.awaitTermination(10L, TimeUnit.SECONDS)) {
                service.shutdownNow();
            }
        }
        catch (InterruptedException ex) {
            service.shutdownNow();
        }
    }

    private void connectionWorker(ClientConnection cxn) {
        while (cxn.isConnected()) {
            RequestContext req = null;
            try {
                req = cxn.dequeue();
                long timeLeft = this.remainingTime(req.startTime, req.timeout);
                req.holdsPermit = this.sendPermits.tryAcquire();
                while (!req.holdsPermit) {
                    req.holdsPermit = this.sendPermits.tryAcquire(timeLeft, TimeUnit.NANOSECONDS);
                    timeLeft = this.remainingTime(req.startTime, req.timeout);
                }
                if (this.awaitClearToSend(cxn, req.startTime, req.timeout)) {
                    timeLeft = this.remainingTime(req.startTime, req.timeout);
                }
                long timeLeftMicros = TimeUnit.NANOSECONDS.toMicros(timeLeft) + 1L;
                req.invocation.setRequestTimeout(timeLeftMicros > Integer.MAX_VALUE ? -1 : (int)timeLeftMicros);
                ByteBuffer buf = this.serializeInvocation(req.invocation);
                this.activeHandles.add(req.invocation.getHandle());
                if (req.timeout < ONE_SECOND_NANOS) {
                    this.setShortTimeoutTask(req, timeLeft);
                }
                cxn.writeToNetwork(buf);
            }
            catch (LocalTimeoutException ex) {
                String err = String.format("Procedure call timed out before sending (timeout %s, elapsed %s)", Client2Impl.timeoutString(ex.timeout), Client2Impl.timeoutString(ex.elapsed));
                this.completeRequestOnLocalFailure(req, true, err);
            }
            catch (SerializationException ex) {
                this.completeRequestOnLocalFailure(req, false, ex.getMessage());
            }
            catch (InterruptedException ex) {
                if (req == null) continue;
                this.completeRequestOnLocalFailure(req, false, "interrupted");
            }
            catch (Exception ex) {
                String msg = String.format("Unexpected exception in sender: %s", ex.getMessage());
                this.logError(msg, new Object[0]);
                this.completeRequestOnLocalFailure(req, false, msg);
            }
        }
        cxn.clearQueue();
    }

    private void setShortTimeoutTask(RequestContext req, long remaining) {
        req.timer = this.timerService.schedule(new SingleTimeoutTask(req), remaining, TimeUnit.NANOSECONDS);
    }

    private void cancelShortTimeoutTask(RequestContext req) {
        Future<?> timer;
        if (req != null && (timer = req.timer) != null) {
            req.timer = null;
            timer.cancel(false);
        }
    }

    private void releasePermit(RequestContext req) {
        if (req.holdsPermit) {
            req.holdsPermit = false;
            this.sendPermits.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean awaitClearToSend(ClientConnection cxn, long startTime, long timeout) throws LocalTimeoutException, InterruptedException {
        boolean waited = false;
        Object object = cxn.backpressureLock;
        synchronized (object) {
            while (cxn.backpressure) {
                waited = true;
                long remaining = this.remainingTime(startTime, timeout);
                cxn.backpressureLock.wait(remaining / 1000000L, (int)(remaining % 1000000L));
            }
        }
        return waited;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void networkBackpressure(ClientConnection cxn, boolean state) {
        Object object = cxn.backpressureLock;
        synchronized (object) {
            cxn.backpressure = state;
            if (!state) {
                cxn.backpressureLock.notifyAll();
            }
        }
    }

    private void handleResponse(ClientConnection cxn, ByteBuffer buf, long endTime) {
        try {
            this.handleResponseImpl(cxn, buf, endTime);
        }
        catch (Exception ex) {
            this.logError("Unhandled exception in response processing: %s", ex);
        }
    }

    private void addConnectionStats(ClientConnectionRequestStats stats) {
        Iterator<Map.Entry<Integer, ClientConnectionRequestStats>> e = this.connectionStats.entrySet().iterator();
        while (e.hasNext()) {
            if (!e.next().getValue().expired()) continue;
            e.remove();
        }
        this.connectionStats.put(stats.getHostId(), stats);
    }

    private void incrementInvocation(long clientHandle, ClientConnection cxn) {
        ClientConnectionRequestStats stats;
        if (clientHandle >= 0L && clientHandle < 0x7FFFFFFFFFFFFFFBL && (stats = this.connectionStats.get(cxn.hostId)) != null) {
            stats.incrementInvocation();
        }
    }

    private void incrementResponse(long clientHandle, ClientConnection cxn) {
        if (clientHandle >= 0L && clientHandle < 0x7FFFFFFFFFFFFFFBL) {
            ClientConnectionRequestStats stats = this.connectionStats.get(cxn.hostId);
            stats.incrementResponse();
        }
    }

    public Map<Integer, ClientConnectionRequestStats> getConnectionRequestStats() {
        return this.connectionStats;
    }

    public void resetConnectionRequestStats() {
        for (ClientConnectionRequestStats e : this.connectionStats.values()) {
            e.reset();
        }
    }

    private void handleResponseImpl(ClientConnection cxn, ByteBuffer buf, long endTime) throws IOException {
        ClientResponseImpl response = new ClientResponseImpl();
        response.initFromBuffer(buf);
        long handle = response.getClientHandle();
        RequestContext context = this.removeRequest(handle);
        this.incrementResponse(handle, cxn);
        if (handle >= 0L && handle <= 0x7FFFFFFFFFFFFFFBL) {
            if (context != null) {
                this.sendPermits.release();
                long elapsedTime = Math.max(endTime - context.startTime, 1L);
                response.setClientRoundtrip(elapsedTime);
                int clusterRTT = response.getClusterRoundtrip();
                boolean abort = response.aborted();
                boolean fail = response.failed();
                String procName = context.invocation.getProcName();
                context.cxn.clientStats(procName).update(elapsedTime, clusterRTT, abort, fail, false);
                context.future.complete(response);
            } else {
                this.notifyLateResponse(response, cxn);
            }
        } else if (handle < 0L) {
            if (context != null) {
                context.future.complete(response);
            } else {
                this.logError("Late response to system procedure call", new Object[0]);
            }
        } else if (handle == 0x7FFFFFFFFFFFFFFEL) {
            this.topoStatsCompletion(response, null);
        } else if (handle == 0x7FFFFFFFFFFFFFFDL) {
            this.procedureCatalogCompletion(response, null);
        } else if (handle == 0x7FFFFFFFFFFFFFFCL) {
            cxn.setOutOfService();
        } else {
            this.logError("Received notification with unexpected handle %d: ignored", handle);
        }
        if (this.requestBackpressureOn) {
            this.reportRequestBackpressure(false);
        }
    }

    private RequestContext removeRequest(long handle) {
        this.activeHandles.remove(handle);
        RequestContext req = this.requestMap.remove(handle);
        this.cancelShortTimeoutTask(req);
        return req;
    }

    private void notifyLateResponse(ClientResponse resp, ClientConnection cxn) {
        if (this.notificationLateResponse != null) {
            String host = cxn.getServer().getHost();
            int port = cxn.getServer().getPort();
            byte status = resp.getStatus();
            if (this.notificationLateResponse != null) {
                try {
                    this.notificationLateResponse.accept(resp, host, port);
                }
                catch (Exception ex) {
                    this.logError("Unhandled exception from notification handler: " + ex, new Object[0]);
                }
            }
        }
    }

    private ClientConnection findConnection(ProcedureInvocation invocation) {
        ProcInfo procInfo = this.procInfoMap.get().get(invocation.getProcName());
        boolean readOnly = procInfo != null && procInfo.readOnly;
        HashinatorLite hashi = this.hashinator.get();
        int hashedPartition = -1;
        if (invocation.hasPartitionDestination()) {
            hashedPartition = invocation.getPartitionDestination();
        } else if (hashi != null && procInfo != null) {
            switch (procInfo.procType) {
                case SINGLE: {
                    if (procInfo.partitionParameter == -1 || procInfo.partitionParameter >= invocation.getPassedParamCount()) break;
                    hashedPartition = hashi.getHashedPartitionForParameter(procInfo.parameterType, invocation.getPartitionParamValue(procInfo.partitionParameter));
                    break;
                }
                case MULTI: {
                    hashedPartition = 16383;
                    break;
                }
            }
        }
        boolean byAffinity = true;
        ClientConnection cxn = this.partitionLeaders.get().get(hashedPartition);
        if (cxn == null || !cxn.isConnected() || cxn.isOutOfService()) {
            cxn = this.findCxnByRoundRobin(invocation);
            byAffinity = false;
        }
        if (cxn != null) {
            this.updateAffinityStats(hashedPartition, readOnly, byAffinity);
        }
        return cxn;
    }

    private ClientConnection findCxnByRoundRobin(ProcedureInvocation invocation) {
        ArrayList<ClientConnection> localList = new ArrayList<ClientConnection>(this.connectionList.size());
        for (ClientConnection cxn : this.connectionList) {
            if (cxn.isOutOfService()) continue;
            localList.add(cxn);
        }
        int cxnCount = localList.size();
        for (int i = 0; i < 2; ++i) {
            for (int j = 0; j < cxnCount; ++j) {
                int n;
                this.nextConnection = n = (this.nextConnection + 1) % cxnCount;
                ClientConnection cxn = (ClientConnection)localList.get(n);
                if (!cxn.isConnected() || i <= 0 && cxn.backpressure) continue;
                return cxn;
            }
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateAffinityStats(Integer hashedPartition, boolean readOnly, boolean affinityUsed) {
        ClientAffinityStats stats = null;
        Map<Integer, ClientAffinityStats> map = this.clientAffinityStats;
        synchronized (map) {
            stats = this.clientAffinityStats.computeIfAbsent(hashedPartition, ClientAffinityStats::new);
        }
        if (affinityUsed) {
            if (readOnly) {
                stats.addAffinityRead();
            } else {
                stats.addAffinityWrite();
            }
        } else if (readOnly) {
            stats.addRrRead();
        } else {
            stats.addRrWrite();
        }
    }

    private ByteBuffer serializeInvocation(ProcedureInvocation pi) throws SerializationException {
        try {
            int size = pi.getSerializedSize();
            ByteBuffer buf = ByteBuffer.allocate(size + 4);
            buf.putInt(size);
            pi.flattenToBuffer(buf);
            buf.flip();
            return buf;
        }
        catch (IOException ex) {
            throw new SerializationException(ex.getMessage());
        }
    }

    private long remainingTime(long startTime, long timeout) throws LocalTimeoutException {
        long now = System.nanoTime();
        long remaining = Math.max(startTime + timeout - now, 0L);
        if (remaining <= 0L) {
            throw new LocalTimeoutException(now - startTime, timeout);
        }
        return remaining;
    }

    private void completeRequestOnLocalFailure(RequestContext req, boolean tmo, String err) {
        long handle = req.invocation.getHandle();
        if (this.removeRequest(handle) != null) {
            byte status = tmo ? (byte)-15 : -14;
            ClientResponseImpl resp = new ClientResponseImpl(status, new VoltTable[0], err);
            resp.setClientHandle(handle);
            resp.setClientRoundtrip(Math.max(System.nanoTime() - req.startTime, 1L));
            this.releasePermit(req);
            req.future.complete(resp);
        }
    }

    private void completeRequestOnTimeout(RequestContext req, long elapsed) {
        long handle = req.invocation.getHandle();
        if (this.removeRequest(handle) != null) {
            String err = String.format("No response received in the allotted time (timeout %s, elapsed %s)", Client2Impl.timeoutString(req.timeout), Client2Impl.timeoutString(elapsed));
            ClientResponseImpl resp = new ClientResponseImpl(-16, new VoltTable[0], err);
            resp.setClientHandle(handle);
            resp.setClientRoundtrip(elapsed);
            int elapsedMS = (int)TimeUnit.NANOSECONDS.toMillis(elapsed);
            resp.setClusterRoundtrip(elapsedMS);
            if (handle >= 0L) {
                req.cxn.clientStats(req.invocation.getProcName()).update(elapsed, elapsedMS, false, false, true);
            }
            this.releasePermit(req);
            req.future.complete(resp);
        }
    }

    private static String timeoutString(long nanos) {
        int i;
        long[] lim = new long[]{10000000000L, 1000000L, 1000L, 0L};
        int[] div = new int[]{1000000000, 1000000, 1000, 1};
        String[] unit = new String[]{"sec", "ms", "\u00b5s", "ns"};
        for (i = 0; i < lim.length - 1 && nanos < lim[i]; ++i) {
        }
        return String.format("%d %s", nanos / (long)div[i], unit[i]);
    }

    private void completeRequestOnHostDown(RequestContext req) {
        long handle = req.invocation.getHandle();
        if (this.removeRequest(handle) != null) {
            String err = "Connection to host was lost before response was received";
            ClientResponseImpl resp = new ClientResponseImpl(-4, new VoltTable[0], err);
            resp.setClientHandle(handle);
            resp.setClientRoundtrip(Math.max(System.nanoTime() - req.startTime, 1L));
            this.releasePermit(req);
            req.future.complete(resp);
        }
    }

    private void callSystemProcedure(ClientConnection cxn, BiConsumer<ClientResponse, Throwable> completion, String procName, Object ... procParams) throws SerializationException {
        CompletableFuture<ClientResponse> future = new CompletableFuture<ClientResponse>();
        future.whenComplete((resp, th) -> completion.accept((ClientResponse)resp, this.unwrapThrowable((Throwable)th)));
        long handle = this.sysHandleGenerator.decrementAndGet();
        ProcedureInvocation pi = new ProcedureInvocation(handle, -1, -1, this.systemRequestPriority, procName, procParams);
        ByteBuffer buf = this.serializeInvocation(pi);
        RequestContext reqCtx = new RequestContext(future, pi, this.procedureCallTimeout, cxn);
        this.requestMap.put(handle, reqCtx);
        cxn.writeToNetwork(buf);
    }

    private Throwable unwrapThrowable(Throwable th) {
        while ((th instanceof ExecutionException || th instanceof CompletionException) && th.getCause() != null) {
            th = th.getCause();
        }
        return th;
    }

    private boolean checkSystemResponse(ClientResponse resp, Throwable th, String what, int minTableCount) {
        boolean ok = false;
        if (th != null) {
            this.logError("Call to %s completed exceptionally: %s", what, th);
        } else if (resp.getStatus() == 1) {
            int count;
            VoltTable[] results = resp.getResults();
            int n = count = results != null ? results.length : 0;
            if (count < minTableCount) {
                this.logError("Unexpected results from %s; needed %d tables, got %d", what, minTableCount, count);
            } else {
                ok = true;
            }
        } else if (resp.getStatus() != -4) {
            this.logError("Unexpected error %d returned from %s", resp.getStatus(), what);
        }
        return ok;
    }

    private boolean ensureSubscription(long delay) {
        boolean pending = false;
        if (!this.isShutdown && !this.connectionList.isEmpty() && this.subscribedConnection == null) {
            if (!this.subscriptionTaskPending.getAndSet(true)) {
                this.execService.schedule(new SubscriberTask(), delay, TimeUnit.NANOSECONDS);
            }
            pending = true;
        }
        return pending;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ClientConnection arbitraryConnection() throws UnavailableException {
        Object object = this.connectionLock;
        synchronized (object) {
            int sz = this.connectionList.size();
            if (sz == 0) {
                throw new UnavailableException("no connection available");
            }
            return this.connectionList.get(this.randomizer.nextInt(sz));
        }
    }

    private void subscribeCompletion(ClientResponse resp, Throwable th) {
        if (!this.checkSystemResponse(resp, th, "@Subscribe", 0)) {
            this.ensureSubscription(this.resubscriptionDelay);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void topoStatsCompletion(ClientResponse resp, Throwable th) {
        if (!this.checkSystemResponse(resp, th, "@Statistics TOPO", 2)) {
            return;
        }
        this.partitionKeysTimestamp.set(0L);
        VoltTable hashConfig = resp.getResults()[1];
        hashConfig.advanceRow();
        this.hashinator.set(new HashinatorLite(hashConfig.getVarbinary("HASHCONFIG"), false));
        Object object = this.hashinatorReady;
        synchronized (object) {
            this.hashinatorReady.notifyAll();
        }
        VoltTable partInfo = resp.getResults()[0];
        HashMap<Integer, ClientConnection> newPartitionLeaders = new HashMap<Integer, ClientConnection>(partInfo.getRowCount());
        HashSet<Integer> unconnected = new HashSet<Integer>();
        while (partInfo.advanceRow()) {
            Integer leaderId;
            ClientConnection cxn;
            Integer partition = (int)partInfo.getLong("Partition");
            String leader = partInfo.getString("Leader");
            String sites = partInfo.getString("Sites");
            if (sites != null && !sites.isEmpty()) {
                for (String site : sites.split(",")) {
                    Integer hostId = Integer.valueOf((site = site.trim()).split(":")[0]);
                    if (this.getConnectionForHost(hostId) != null) continue;
                    unconnected.add(hostId);
                }
            }
            if (leader == null || leader.isEmpty() || (cxn = this.getConnectionForHost(leaderId = Integer.valueOf(leader.split(":")[0]))) == null) continue;
            newPartitionLeaders.put(partition, cxn);
        }
        this.partitionLeaders.set(newPartitionLeaders);
        if (!unconnected.isEmpty()) {
            this.debug("%d hosts are not currently connected", unconnected.size());
            this.scheduleConnectionTask(unconnected, 0L);
        }
    }

    private void procedureCatalogCompletion(ClientResponse resp, Throwable th) {
        if (!this.checkSystemResponse(resp, th, "@SystemCatalog PROCEDURES", 1)) {
            return;
        }
        int badJson = 0;
        VoltTable procTable = resp.getResults()[0];
        HashMap<String, ProcInfo> newProcInfoMap = new HashMap<String, ProcInfo>(procTable.getRowCount());
        while (procTable.advanceRow()) {
            String procName = "<unknown>";
            try {
                procName = procTable.getString(2);
                JSONObject jsObj = new JSONObject(procTable.getString(6));
                boolean readOnly = jsObj.optBoolean("readOnly");
                boolean compound = jsObj.optBoolean("compound");
                boolean single = jsObj.optBoolean("singlePartition");
                int partitionParam = -1;
                int paramType = -1;
                if (single) {
                    partitionParam = jsObj.getInt("partitionParameter");
                    paramType = jsObj.getInt("partitionParameterType");
                }
                newProcInfoMap.put(procName, new ProcInfo(single, compound, readOnly, partitionParam, paramType));
            }
            catch (JSONException ex) {
                if (++badJson > 10) continue;
                this.logError("Catalog parse error for procedure '%s'", procName);
            }
        }
        this.procInfoMap.set(Collections.unmodifiableMap(newProcInfoMap));
    }

    private void refreshTopology(long delay) {
        if (!(this.isShutdown || this.connectionList.isEmpty() || this.subscriptionTaskPending.get() || this.topoRefreshTaskPending.getAndSet(true))) {
            this.execService.schedule(new TopologyRefreshTask(), delay, TimeUnit.NANOSECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void refreshPartitionKeys(Consumer<Throwable> waiter) {
        if (!this.isShutdown && !this.connectionList.isEmpty()) {
            long age = System.currentTimeMillis() - this.partitionKeysTimestamp.get();
            if (TimeUnit.MILLISECONDS.toNanos(age) > this.partitionKeysCacheRefresh) {
                List<Consumer<Throwable>> list = this.partitionKeysWaiters;
                synchronized (list) {
                    this.partitionKeysWaiters.add(waiter);
                }
                if (!this.partitionKeysUpdateInProgress.getAndSet(true)) {
                    this.execService.schedule(new PartitionKeysTask(), 0L, TimeUnit.NANOSECONDS);
                }
            } else {
                waiter.accept(null);
            }
        } else {
            waiter.accept(new RuntimeException("no connection available"));
        }
    }

    private void partitionKeysCompletion(ClientResponse resp, Throwable th) {
        if (!this.checkSystemResponse(resp, th, "@GetPartitionKeys INTEGER", 1)) {
            if (th == null) {
                th = new RuntimeException("Partition keys cannot be determined");
            }
            this.notifyPartitionKeysWaiters(th);
            return;
        }
        HashMap<Integer, Integer> newMap = new HashMap<Integer, Integer>();
        VoltTable keyInfo = resp.getResults()[0];
        while (keyInfo.advanceRow()) {
            Integer id = (int)keyInfo.getLong("PARTITION_ID");
            Integer key = (int)keyInfo.getLong("PARTITION_KEY");
            newMap.put(id, key);
        }
        this.partitionKeysTimestamp.set(System.currentTimeMillis());
        this.partitionKeys.set(newMap);
        this.notifyPartitionKeysWaiters(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyPartitionKeysWaiters(Throwable th) {
        ArrayList<Consumer<Throwable>> waiters;
        this.partitionKeysUpdateInProgress.set(false);
        List<Consumer<Throwable>> list = this.partitionKeysWaiters;
        synchronized (list) {
            waiters = new ArrayList<Consumer<Throwable>>(this.partitionKeysWaiters);
            this.partitionKeysWaiters.clear();
        }
        for (Consumer consumer : waiters) {
            consumer.accept(th);
        }
    }

    private void scheduleFirstConnection(Set<HostInfo> hosts, long delay) {
        if (this.autoConnectionMgmt && !this.isShutdown && !hosts.isEmpty() && !this.connectionTaskPending.getAndSet(true)) {
            this.execService.schedule(new FirstConnectionTask(hosts), delay, TimeUnit.NANOSECONDS);
        }
    }

    private void scheduleConnectionTask(Set<Integer> hostIds, long delay) {
        if (this.autoConnectionMgmt && !this.isShutdown && !hostIds.isEmpty() && !this.connectionTaskPending.getAndSet(true)) {
            this.execService.schedule(new ConnectionInitTask(hostIds), delay, TimeUnit.NANOSECONDS);
        }
    }

    private Map<Integer, HostInfo> getUnconnectedAddresses(Set<Integer> hostIds, VoltTable info) {
        if (this.infoTablePortKey == null) {
            this.infoTablePortKey = this.sniffForPortKey(info);
        }
        HashMap<Integer, String> addrMap = new HashMap<Integer, String>();
        HashMap<Integer, Integer> portMap = new HashMap<Integer, Integer>();
        while (info.advanceRow()) {
            Integer hostId;
            String key = info.getString("KEY");
            if (key.equals("IPADDRESS")) {
                hostId = (int)info.getLong("HOST_ID");
                String addr = info.getString("VALUE");
                addrMap.put(hostId, addr);
                continue;
            }
            if (!key.equals(this.infoTablePortKey)) continue;
            hostId = (int)info.getLong("HOST_ID");
            Integer port = Integer.valueOf(info.getString("VALUE"));
            portMap.put(hostId, port);
        }
        HashMap<Integer, HostInfo> hiMap = new HashMap<Integer, HostInfo>();
        for (Integer hostId : hostIds) {
            if (this.getConnectionForHost(hostId) != null) continue;
            String addr = (String)addrMap.get(hostId);
            Integer port = (Integer)portMap.get(hostId);
            if (addr != null && port != null) {
                hiMap.put(hostId, HostInfo.fromParts(addr, port));
                continue;
            }
            this.logError("Cannot connect to host %d, no address/port information found", hostId);
        }
        return hiMap;
    }

    private String sniffForPortKey(VoltTable info) {
        int peons = 0;
        int admins = 0;
        while (info.advanceRow()) {
            if (!info.getString("KEY").equals("ADMINPORT")) continue;
            Integer hostId = (int)info.getLong("HOST_ID");
            Integer adminPort = Integer.valueOf(info.getString("VALUE"));
            ClientConnection cxn = this.getConnectionForHost(hostId);
            if (cxn == null) continue;
            if (cxn.connection.getRemotePort() == adminPort.intValue()) {
                ++admins;
                continue;
            }
            ++peons;
        }
        info.resetRowPosition();
        return peons == 0 && admins != 0 ? "ADMINPORT" : "CLIENTPORT";
    }

    private static Thread newDaemonThread(Runnable func, String name) {
        Thread t = new Thread(func, name);
        t.setDaemon(true);
        return t;
    }

    Map<Long, Map<String, ClientStats>> getStatsSnapshot() {
        TreeMap<Long, Map<String, ClientStats>> retval = new TreeMap<Long, Map<String, ClientStats>>();
        for (ClientConnection conn : this.connectionList) {
            TreeMap<String, ClientStats> connMap = new TreeMap<String, ClientStats>();
            for (Map.Entry<String, ClientStats> ent : conn.stats.entrySet()) {
                connMap.put(ent.getKey(), (ClientStats)ent.getValue().clone());
            }
            retval.put(conn.connectionId(), connMap);
        }
        return retval;
    }

    Map<Long, ClientIOStats> getIOStatsSnapshot() {
        TreeMap<Long, ClientIOStats> retval = new TreeMap<Long, ClientIOStats>();
        HashSet<Long> liveConn = new HashSet<Long>();
        for (ClientConnection conn : this.connectionList) {
            liveConn.add(conn.connectionId());
        }
        for (IOStatsData data : this.networkPool.getIOStats(false)) {
            if (!liveConn.contains(data.getConnectionId())) continue;
            retval.put(data.getConnectionId(), new ClientIOStats(data));
        }
        return retval;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    Map<Integer, ClientAffinityStats> getAffinityStatsSnapshot() {
        HashMap<Integer, ClientAffinityStats> retval = new HashMap<Integer, ClientAffinityStats>();
        Map<Integer, ClientAffinityStats> map = this.clientAffinityStats;
        synchronized (map) {
            for (Map.Entry<Integer, ClientAffinityStats> ent : this.clientAffinityStats.entrySet()) {
                retval.put(ent.getKey(), (ClientAffinityStats)ent.getValue().clone());
            }
        }
        return retval;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public VoltBulkLoader newBulkLoader(String tableName, int maxBatchSize, boolean upsertMode, BulkLoaderFailureCallBack failureCallback, BulkLoaderSuccessCallback successCallback) throws Exception {
        Client2Impl client2Impl = this;
        synchronized (client2Impl) {
            if (this.bulkState == null) {
                this.bulkState = new BulkLoaderState(this);
            }
        }
        return this.bulkState.newBulkLoader(tableName, maxBatchSize, upsertMode, failureCallback, successCallback);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean waitForTopology(long timeout, TimeUnit unit) {
        boolean ready = false;
        long start = System.currentTimeMillis();
        try {
            Object object = this.hashinatorReady;
            synchronized (object) {
                long now;
                for (long remaining = unit.toMillis(Math.max(timeout, 0L)); !(ready = this.hashinator.get() != null) && remaining > 0L; remaining -= Math.max(now - start, 1L)) {
                    this.hashinatorReady.wait(remaining);
                    now = System.currentTimeMillis();
                    start = now;
                }
            }
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        return ready;
    }

    public boolean autoConnectionMgmt() {
        return this.autoConnectionMgmt;
    }

    public int getPartitionForParameter(byte type, Object value) {
        HashinatorLite hashi = this.hashinator.get();
        return hashi != null ? hashi.getHashedPartitionForParameter(type, value) : -1;
    }

    private ClientResponse toSyncProcCall(CompletableFuture<ClientResponse> future) throws ProcCallException, IOException {
        ClientResponse resp = null;
        try {
            resp = future.get();
            if (resp.getStatus() == 1) {
                return resp;
            }
        }
        catch (Exception ex) {
            this.throwMappedException(ex);
            return null;
        }
        throw new ProcCallException(resp);
    }

    private ClientResponseWithPartitionKey[] toSyncAllPartCall(CompletableFuture<ClientResponseWithPartitionKey[]> future) throws IOException {
        try {
            return future.get();
        }
        catch (Exception ex) {
            this.throwMappedException(ex);
            return null;
        }
    }

    private <T> T toSyncReturn(CompletableFuture<T> future) throws IOException {
        try {
            return future.get();
        }
        catch (Exception ex) {
            this.throwMappedException(ex);
            return null;
        }
    }

    private void throwMappedException(Exception ex) throws IOException {
        if ((ex = this.unwrapException(ex)) instanceof IOException) {
            throw (IOException)ex;
        }
        if (ex instanceof RuntimeException) {
            throw (RuntimeException)ex;
        }
        throw new GeneralException(ex);
    }

    private Exception unwrapException(Exception ex) {
        Throwable cause;
        while ((ex instanceof ExecutionException || ex instanceof CompletionException) && (cause = ex.getCause()) != null && cause instanceof Exception) {
            ex = (Exception)cause;
        }
        return ex;
    }

    private void logError(String msg, Object ... args) {
        if (args.length > 0) {
            msg = String.format(msg, args);
        }
        this.errorLog.accept(msg);
    }

    private void printError(String msg) {
        System.err.println("%%% " + msg);
    }

    private void debug(String msg, Object ... args) {
    }

    private static class PrioOrder
    implements Comparator<RequestContext> {
        private PrioOrder() {
        }

        @Override
        public int compare(RequestContext a, RequestContext b) {
            int cmp = Integer.compare(a.invocation.getRequestPriority(), b.invocation.getRequestPriority());
            if (cmp == 0) {
                cmp = Long.compare(a.sequence, b.sequence);
            }
            return cmp;
        }
    }

    private class TimeoutTask
    implements Runnable {
        private TimeoutTask() {
        }

        @Override
        public void run() {
            try {
                long now = System.nanoTime();
                for (ClientConnection cxn : Client2Impl.this.connectionList) {
                    long sinceLastResponse = Math.max(now - cxn.lastResponseTime, 1L);
                    if (cxn.outstandingPing && sinceLastResponse > Client2Impl.this.connectionResponseTimeout) {
                        Client2Impl.this.logError("Connection to %s timed out", cxn.getServer());
                        cxn.connection.unregister();
                    }
                    if (cxn.outstandingPing || sinceLastResponse <= Client2Impl.this.connectionResponseTimeout / 3L) continue;
                    cxn.outstandingPing = true;
                    Client2Impl.this.callSystemProcedure(cxn, (r, t) -> {
                        cxn.outstandingPing = false;
                    }, "@Ping", new Object[0]);
                }
                int timedOut = 0;
                for (Long handle : Client2Impl.this.activeHandles) {
                    long delta;
                    RequestContext req = Client2Impl.this.requestMap.get(handle);
                    if (req == null || (delta = Math.max(now - req.startTime, 1L)) <= req.timeout) continue;
                    Client2Impl.this.completeRequestOnTimeout(req, delta);
                    ++timedOut;
                }
                if (timedOut > 0 && Client2Impl.this.requestBackpressureOn) {
                    Client2Impl.this.reportRequestBackpressure(false);
                }
            }
            catch (Exception ex) {
                Client2Impl.this.logError("Unexpected exception in timeout task: %s", ex.getMessage());
            }
        }
    }

    private class ClientConnection
    extends VoltProtocolHandler {
        private final BlockingQueue<RequestContext> pending = Client2Impl.allocateQueue();
        private final HostInfo server;
        private Thread worker;
        private Connection connection;
        private int hostId = -1;
        private final Object backpressureLock = new Object();
        private boolean backpressure;
        private final Map<String, ClientStats> stats = new ConcurrentHashMap<String, ClientStats>();
        private ClientConnectionRequestStats connectionRequestStats;
        volatile boolean connected;
        volatile long lastResponseTime;
        volatile boolean outOfService;
        boolean outstandingPing;

        ClientConnection(HostInfo s) {
            this.server = s;
        }

        HostInfo getServer() {
            return this.server;
        }

        void setConnectionRequestStats(ClientConnectionRequestStats stats) {
            this.connectionRequestStats = stats;
        }

        void setConnection(Connection c, int i) {
            this.connection = c;
            this.hostId = i;
            this.connected = true;
        }

        Connection getConnection() {
            return this.connection;
        }

        boolean isConnected() {
            return this.connected;
        }

        boolean isOutOfService() {
            return this.outOfService;
        }

        void setOutOfService() {
            this.outOfService = true;
        }

        void start() {
            String name = String.format("Client2-Worker-%d", this.connection.connectionId());
            this.worker = new Thread(Client2Impl.this.workerGroup, () -> Client2Impl.this.connectionWorker(this), name);
            this.worker.setDaemon(true);
            this.worker.start();
        }

        void enqueue(RequestContext req) {
            if (this.connected && !this.pending.offer(req)) {
                throw new IllegalStateException("Internal error: unbounded queue refused offer");
            }
        }

        RequestContext dequeue() throws InterruptedException {
            return this.pending.take();
        }

        void clearQueue() {
            this.pending.clear();
        }

        void writeToNetwork(ByteBuffer buf) {
            this.connection.writeStream().enqueue(buf);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        ClientStats clientStats(String procName) {
            ClientStats st = null;
            Map<String, ClientStats> map = this.stats;
            synchronized (map) {
                st = this.stats.get(procName);
                if (st == null) {
                    st = new ClientStats();
                    st.m_connectionId = this.connectionId();
                    st.m_hostname = this.server.getHost();
                    st.m_port = this.server.getPort();
                    st.m_procName = procName;
                    st.m_startTS = System.currentTimeMillis();
                    st.m_endTS = Long.MIN_VALUE;
                    this.stats.put(procName, st);
                }
            }
            return st;
        }

        @Override
        public void handleMessage(ByteBuffer buf, Connection conn) {
            this.lastResponseTime = System.nanoTime();
            Client2Impl.this.responseService.submit(() -> Client2Impl.this.handleResponse(this, buf, this.lastResponseTime));
        }

        @Override
        public int getMaxRead() {
            return Integer.MAX_VALUE;
        }

        @Override
        public Runnable onBackPressure() {
            return new Runnable(){

                @Override
                public void run() {
                    Client2Impl.this.networkBackpressure(ClientConnection.this, true);
                }
            };
        }

        @Override
        public Runnable offBackPressure() {
            return new Runnable(){

                @Override
                public void run() {
                    Client2Impl.this.networkBackpressure(ClientConnection.this, false);
                }
            };
        }

        @Override
        public QueueMonitor writestreamMonitor() {
            return null;
        }

        @Override
        public void stopping(Connection conn) {
            super.stopping(conn);
            this.connected = false;
            this.worker.interrupt();
            Client2Impl.this.removeConnection(this);
        }
    }

    private class UserConnectionTask
    implements Runnable {
        private final List<HostInfo> servers;
        private final long startTime;
        private final long timeout;
        private final long retryDelay;
        private final CompletableFuture<Void> future;

        UserConnectionTask(List<HostInfo> s, long t, long d, CompletableFuture<Void> f) {
            this.servers = new ArrayList<HostInfo>(s);
            this.startTime = System.nanoTime();
            this.timeout = t;
            this.retryDelay = d;
            this.future = f;
        }

        @Override
        public void run() {
            boolean retry = true;
            Throwable lastExc = null;
            Iterator<HostInfo> it = this.servers.iterator();
            while (it.hasNext()) {
                HostInfo server = it.next();
                try {
                    Client2Impl.this.createConnection(server);
                    this.future.complete(null);
                    return;
                }
                catch (IOException ex) {
                    retry &= System.nanoTime() - this.startTime < this.timeout;
                    lastExc = ex;
                }
                catch (Exception ex) {
                    Client2Impl.this.logError("Unexpected exception, connect to %s failed: %s", server, ex.getMessage());
                    it.remove();
                    lastExc = ex;
                }
            }
            if (retry && !this.servers.isEmpty()) {
                Client2Impl.this.execService.schedule(this, this.retryDelay, TimeUnit.NANOSECONDS);
            } else {
                Object err = "Failed to connect to cluster";
                if (lastExc != null && lastExc.getMessage() != null) {
                    err = (String)err + ": " + lastExc.getMessage();
                }
                this.future.completeExceptionally(new ConnectException((String)err));
            }
        }
    }

    private static class HostInfo {
        private String name;
        private String addr;
        private int type;
        private int port;

        private HostInfo(String n, String a, int t, int p) {
            this.name = n;
            this.addr = a;
            this.type = t;
            this.port = p;
        }

        private static HostInfo create(String host, int port) {
            InetAddress ia = null;
            try {
                ia = InetAddress.getByName(host);
            }
            catch (UnknownHostException ex) {
                return new HostInfo(host, null, 0, port);
            }
            String[] parts = ia.toString().split("/", 2);
            if (parts.length != 2 || parts[1].isEmpty()) {
                throw new IllegalArgumentException("Unexpected inet address string form: " + ia);
            }
            String addr = parts[1];
            String name = parts[0];
            if (name.isEmpty()) {
                name = ReverseDNSCache.hostname(ia);
            }
            return new HostInfo(name, addr, HostInfo.addressType(ia), port);
        }

        private static int addressType(InetAddress ia) {
            return ia instanceof Inet4Address ? 4 : (ia instanceof Inet6Address ? 6 : 0);
        }

        private static void ensurePort(int port) {
            if (port <= 0) {
                throw new IllegalArgumentException("Invalid port number: " + port);
            }
        }

        static HostInfo fromString(String str, int defPort) {
            HostAndPort hap = HostAndPort.fromString(str).withDefaultPort(defPort).requireBracketsForIPv6();
            HostInfo.ensurePort(hap.getPort());
            return HostInfo.create(hap.getHost(), hap.getPort());
        }

        static HostInfo fromParts(String host, int port) {
            HostAndPort hap = HostAndPort.fromParts(host, port);
            HostInfo.ensurePort(hap.getPort());
            return HostInfo.create(hap.getHost(), hap.getPort());
        }

        boolean unresolvedHostName() {
            return this.name != null && this.addr == null;
        }

        String getHost() {
            return this.name != null ? this.name : this.addr;
        }

        String getHostName() {
            return this.name;
        }

        String getHostAddress() {
            return this.addr;
        }

        int getPort() {
            return this.port;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof HostInfo)) {
                return false;
            }
            HostInfo that = (HostInfo)obj;
            return Objects.equals(this.name, that.name) && Objects.equals(this.addr, that.addr) && this.port == that.port;
        }

        public int hashCode() {
            return Objects.hash(this.name, this.addr, this.port);
        }

        public String toString() {
            return this.name != null ? String.format("%s:%d", this.name, this.port) : (this.type == 6 ? String.format("[%s]:%d", this.addr, this.port) : String.format("%s:%s", this.addr, this.port));
        }
    }

    private static class RequestContext {
        static final AtomicLong sequencer = new AtomicLong(0L);
        final long sequence = sequencer.incrementAndGet();
        final CompletableFuture<ClientResponse> future;
        final ProcedureInvocation invocation;
        final long startTime;
        final long timeout;
        final ClientConnection cxn;
        boolean holdsPermit;
        volatile Future<?> timer;

        public RequestContext(CompletableFuture<ClientResponse> future, ProcedureInvocation invocation, long timeout, ClientConnection cxn) {
            this.future = future;
            this.invocation = invocation;
            this.startTime = System.nanoTime();
            this.timeout = timeout;
            this.cxn = cxn;
        }
    }

    private class AllPartitionCallContext {
        final CompletableFuture<ClientResponseWithPartitionKey[]> future = new CompletableFuture();
        final long clientTimeout;
        final long queryTimeout;
        final int requestPrio;
        final String procName;
        final Object[] params;

        AllPartitionCallContext(long ct, long qt, int rp, String proc, Object[] pars) {
            this.clientTimeout = ct;
            this.queryTimeout = qt;
            this.requestPrio = rp;
            this.procName = proc;
            this.params = pars;
        }
    }

    private static class LocalTimeoutException
    extends Exception {
        long elapsed;
        long timeout;

        LocalTimeoutException(long el, long tmo) {
            super("timeout");
            this.elapsed = el;
            this.timeout = tmo;
        }
    }

    private static class SerializationException
    extends Exception {
        SerializationException(String msg) {
            super(msg);
        }
    }

    private class SingleTimeoutTask
    implements Runnable {
        private final RequestContext req;

        SingleTimeoutTask(RequestContext req) {
            this.req = req;
        }

        @Override
        public void run() {
            this.req.timer = null;
            if (Client2Impl.this.activeHandles.contains(this.req.invocation.getHandle())) {
                long delta = Math.max(System.nanoTime() - this.req.startTime, 1L);
                Client2Impl.this.completeRequestOnTimeout(this.req, delta);
            }
        }
    }

    private static class ProcInfo {
        static final int PARAMETER_NONE = -1;
        final Type procType;
        final boolean readOnly;
        final int partitionParameter;
        final int parameterType;

        ProcInfo(boolean single, boolean compound, boolean readOnly, int partitionParameter, int parameterType) {
            this.procType = single ? Type.SINGLE : (compound ? Type.COMPOUND : Type.MULTI);
            this.readOnly = readOnly;
            this.partitionParameter = single ? partitionParameter : -1;
            this.parameterType = single ? parameterType : -1;
        }

        static enum Type {
            SINGLE,
            MULTI,
            COMPOUND;

        }
    }

    private class SubscriberTask
    implements Runnable {
        private SubscriberTask() {
        }

        @Override
        public void run() {
            try {
                ClientConnection cxn;
                Client2Impl.this.subscribedConnection = cxn = Client2Impl.this.arbitraryConnection();
                Client2Impl.this.subscriptionTaskPending.set(false);
                Client2Impl.this.callSystemProcedure(cxn, Client2Impl.this::subscribeCompletion, "@Subscribe", "TOPOLOGY");
                Client2Impl.this.callSystemProcedure(cxn, Client2Impl.this::topoStatsCompletion, "@Statistics", "TOPO");
                Client2Impl.this.callSystemProcedure(cxn, Client2Impl.this::procedureCatalogCompletion, "@SystemCatalog", "PROCEDURES");
            }
            catch (UnavailableException ex) {
                Client2Impl.this.subscriptionTaskPending.set(false);
            }
            catch (Exception ex) {
                Client2Impl.this.logError("Unexpected exception in subscriber task: %s", ex.getMessage());
                Client2Impl.this.subscribedConnection = null;
                Client2Impl.this.subscriptionTaskPending.set(false);
                Client2Impl.this.ensureSubscription(Client2Impl.this.resubscriptionFailureDelay);
            }
        }
    }

    private static class UnavailableException
    extends Exception {
        UnavailableException(String msg) {
            super(msg);
        }
    }

    private class TopologyRefreshTask
    implements Runnable {
        private TopologyRefreshTask() {
        }

        @Override
        public void run() {
            try {
                ClientConnection cxn = Client2Impl.this.subscribedConnection;
                if (cxn == null) {
                    cxn = Client2Impl.this.arbitraryConnection();
                }
                Client2Impl.this.topoRefreshTaskPending.set(false);
                Client2Impl.this.callSystemProcedure(cxn, Client2Impl.this::topoStatsCompletion, "@Statistics", "TOPO");
            }
            catch (UnavailableException ex) {
                Client2Impl.this.topoRefreshTaskPending.set(false);
            }
            catch (Exception ex) {
                Client2Impl.this.logError("Unexpected exception in topology refresh task: %s", ex.getMessage());
                Client2Impl.this.topoRefreshTaskPending.set(false);
                Client2Impl.this.refreshTopology(Client2Impl.this.topoRefreshFailureDelay);
            }
        }
    }

    private class PartitionKeysTask
    implements Runnable {
        private PartitionKeysTask() {
        }

        @Override
        public void run() {
            try {
                ClientConnection cxn = Client2Impl.this.subscribedConnection;
                if (cxn == null) {
                    cxn = Client2Impl.this.arbitraryConnection();
                }
                Client2Impl.this.callSystemProcedure(cxn, Client2Impl.this::partitionKeysCompletion, "@GetPartitionKeys", "INTEGER");
            }
            catch (UnavailableException ex) {
                Client2Impl.this.notifyPartitionKeysWaiters(ex);
            }
            catch (Exception ex) {
                Client2Impl.this.logError("Unexpected exception in partition-keys task: %s", ex.getMessage());
                Client2Impl.this.notifyPartitionKeysWaiters(ex);
            }
        }
    }

    private class FirstConnectionTask
    implements Runnable {
        private final Set<HostInfo> hosts;

        FirstConnectionTask(Set<HostInfo> hosts) {
            this.hosts = hosts;
        }

        @Override
        public void run() {
            boolean retry = true;
            try {
                for (HostInfo host : this.hosts) {
                    try {
                        Client2Impl.this.createConnection(host);
                        retry = false;
                        break;
                    }
                    catch (IOException iOException) {
                    }
                    catch (Exception ex) {
                        Client2Impl.this.logError("Unexpected exception, connect to %s failed: %s", host, ex.getMessage());
                    }
                }
            }
            catch (Exception ex) {
                Client2Impl.this.logError("Unexpected exception in first connection task: %s", ex.getMessage());
            }
            Client2Impl.this.connectionTaskPending.set(false);
            if (retry) {
                Client2Impl.this.scheduleFirstConnection(this.hosts, Client2Impl.this.reconnectRetryDelay);
            }
        }
    }

    private class ConnectionInitTask
    implements Runnable {
        private final Set<Integer> hostIds;

        ConnectionInitTask(Set<Integer> hostIds) {
            this.hostIds = hostIds;
        }

        @Override
        public void run() {
            try {
                ClientConnection cxn = Client2Impl.this.arbitraryConnection();
                Client2Impl.this.callSystemProcedure(cxn, this::hostInfoCompletion, "@SystemInformation", "OVERVIEW");
            }
            catch (UnavailableException ex) {
                Client2Impl.this.connectionTaskPending.set(false);
            }
            catch (Exception ex) {
                Client2Impl.this.logError("Unexpected exception in connection init task: %s", ex.getMessage());
                Client2Impl.this.connectionTaskPending.set(false);
                Client2Impl.this.scheduleConnectionTask(this.hostIds, Client2Impl.this.reconnectRetryDelay);
            }
        }

        void hostInfoCompletion(ClientResponse resp, Throwable th) {
            if (!Client2Impl.this.checkSystemResponse(resp, th, "@SystemInformation OVERVIEW", 1)) {
                Client2Impl.this.connectionTaskPending.set(false);
                Client2Impl.this.scheduleConnectionTask(this.hostIds, Client2Impl.this.reconnectRetryDelay);
                return;
            }
            Client2Impl.this.execService.schedule(new ConnectionTask(this.hostIds, resp.getResults()[0]), 0L, TimeUnit.NANOSECONDS);
        }
    }

    private class ConnectionTask
    implements Runnable {
        private final Set<Integer> hostIds;
        private final VoltTable info;

        ConnectionTask(Set<Integer> hostIds, VoltTable info) {
            this.hostIds = hostIds;
            this.info = info;
        }

        @Override
        public void run() {
            boolean retry = false;
            Map<Integer, HostInfo> unconnected = null;
            try {
                unconnected = Client2Impl.this.getUnconnectedAddresses(this.hostIds, this.info);
                Iterator<Map.Entry<Integer, HostInfo>> it = unconnected.entrySet().iterator();
                while (it.hasNext()) {
                    Map.Entry<Integer, HostInfo> ent = it.next();
                    Integer hostId = ent.getKey();
                    HostInfo host = ent.getValue();
                    try {
                        Client2Impl.this.createConnection(host);
                        it.remove();
                    }
                    catch (IOException ex) {
                        retry = true;
                    }
                    catch (Exception ex) {
                        Client2Impl.this.logError("Unexpected exception, connect to %s failed: %s", host, ex.getMessage());
                        retry = true;
                    }
                }
            }
            catch (Exception ex) {
                Client2Impl.this.logError("Unexpected exception in connection task: %s", ex.getMessage());
                retry = true;
            }
            Client2Impl.this.connectionTaskPending.set(false);
            if (retry) {
                Client2Impl.this.scheduleConnectionTask(unconnected != null ? unconnected.keySet() : this.hostIds, Client2Impl.this.reconnectRetryDelay);
            }
        }
    }
}

