/*
 * Decompiled with CFR 0.152.
 */
package org.voltdb.client;

import com.google_voltpatches.common.base.Strings;
import com.google_voltpatches.common.base.Throwables;
import com.google_voltpatches.common.collect.ImmutableMap;
import com.google_voltpatches.common.collect.ImmutableSet;
import com.google_voltpatches.common.collect.ImmutableSortedMap;
import com.google_voltpatches.common.collect.Maps;
import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.ssl.SslContext;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;
import javax.security.auth.Subject;
import org.cliffc_voltpatches.high_scale_lib.NonBlockingHashMap;
import org.json_voltpatches.JSONException;
import org.json_voltpatches.JSONObject;
import org.voltcore.network.CipherExecutor;
import org.voltcore.network.Connection;
import org.voltcore.network.QueueMonitor;
import org.voltcore.network.VoltNetworkPool;
import org.voltcore.network.VoltProtocolHandler;
import org.voltcore.utils.CoreUtils;
import org.voltcore.utils.EstTime;
import org.voltdb.ClientResponseImpl;
import org.voltdb.VoltTable;
import org.voltdb.client.ClientAffinityStats;
import org.voltdb.client.ClientAuthScheme;
import org.voltdb.client.ClientConfig;
import org.voltdb.client.ClientConnectionRequestStats;
import org.voltdb.client.ClientIOStats;
import org.voltdb.client.ClientImpl;
import org.voltdb.client.ClientResponse;
import org.voltdb.client.ClientStats;
import org.voltdb.client.ClientStatsContext;
import org.voltdb.client.ClientStatusListenerExt;
import org.voltdb.client.ConnectionUtil;
import org.voltdb.client.HashinatorLite;
import org.voltdb.client.NoConnectionsException;
import org.voltdb.client.ProcCallException;
import org.voltdb.client.ProcedureCallback;
import org.voltdb.client.ProcedureInvocation;
import org.voltdb.client.RateLimiter;

class Distributer {
    private static final long MINIMUM_LONG_RUNNING_SYSTEM_CALL_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(30L);
    private static final long ONE_SECOND_NANOS = TimeUnit.SECONDS.toNanos(1L);
    static int RESUBSCRIPTION_DELAY_MS = Integer.getInteger("RESUBSCRIPTION_DELAY_MS", 10000);
    static final long USE_DEFAULT_CLIENT_TIMEOUT = 0L;
    static long TOPO_AWARE_PROC_TIMEOUT_NS = Long.getLong("TOPO_AWARE_PROC_TIMEOUT_NS", ClientConfig.DEFAULT_PROCEDURE_TIMEOUT_NANOS);
    private final SslContext m_sslContext;
    private final boolean m_sslHostCheck;
    private final boolean m_useMultipleThreads;
    private final long m_procedureCallTimeoutNanos;
    private final long m_connectionResponseTimeoutNanos;
    private final Subject m_subject;
    private int m_sysRequestPrio = -1;
    private final ArrayList<ClientStatusListenerExt> m_listeners = new ArrayList();
    public final AtomicLong m_sysHandle = new AtomicLong(-1L);
    private final CopyOnWriteArrayList<NodeConnection> m_connections = new CopyOnWriteArrayList();
    private final Map<Integer, NodeConnection> m_hostIdToConnection = new HashMap<Integer, NodeConnection>();
    private final VoltNetworkPool m_network;
    private int m_nextConnection = 0;
    final RateLimiter m_rateLimiter = new RateLimiter();
    private boolean m_lastBackpressureReport;
    private int m_backpressureQueueLimit = 100;
    private int m_maxQueuedBytes = 262144;
    private boolean m_topologyChangeAware;
    private final AtomicReference<ImmutableSet<Integer>> m_unconnectedHosts = new AtomicReference();
    private final AtomicBoolean m_createConnectionUponTopoChangeInProgress = new AtomicBoolean(false);
    private NodeConnection m_subscribedConnection;
    private boolean m_subscriptionRequestPending;
    private HashinatorLite m_hashinator;
    private final Map<Integer, NodeConnection> m_partitionMasters = new HashMap<Integer, NodeConnection>();
    private final Map<Integer, ClientAffinityStats> m_clientAffinityStats = new HashMap<Integer, ClientAffinityStats>();
    private final Map<Integer, ClientConnectionRequestStats> m_clientConnectionRequestStats = new HashMap<Integer, ClientConnectionRequestStats>();
    private final AtomicReference<ImmutableMap<Integer, Integer>> m_partitionKeys = new AtomicReference();
    private final AtomicReference<ImmutableMap<Integer, Integer>> m_partitionKeyMap = new AtomicReference();
    private final AtomicReference<ClientResponse> m_partitionUpdateStatus = new AtomicReference();
    private final AtomicLong m_lastPartitionKeyFetched = new AtomicLong(0L);
    private final AtomicReference<ImmutableSortedMap<String, Procedure>> m_procedureInfo = new AtomicReference();
    private boolean m_fetchedCatalog;
    private final ScheduledExecutorService m_tmoExec = Executors.newSingleThreadScheduledExecutor(CoreUtils.getThreadFactory("client-timeout-thread"));
    private ScheduledFuture<?> m_timeoutReaperHandle;
    private final ScheduledExecutorService m_subsExec = Executors.newSingleThreadScheduledExecutor(CoreUtils.getThreadFactory("client-subscription-thread"));
    private CipherExecutor m_cipherService;
    private Object[] m_clusterInstanceId;
    private String m_buildString;
    private AtomicBoolean m_shutdown = new AtomicBoolean(false);

    private static boolean isLongOp(String procName) {
        return procName.startsWith("@") && (procName.equals("@UpdateApplicationCatalog") || procName.equals("@UpdateDeployment") || procName.equals("@SnapshotSave"));
    }

    void drain() throws InterruptedException {
        boolean more;
        long sleep = 500L;
        do {
            more = false;
            for (NodeConnection cxn : this.m_connections) {
                more = !cxn.m_callbacks.isEmpty();
                if (!more) continue;
                break;
            }
            if (!more) continue;
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }
            LockSupport.parkNanos(TimeUnit.MICROSECONDS.toNanos(sleep));
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }
            if (sleep >= 5000L) continue;
            sleep += 500L;
        } while (more);
    }

    public synchronized void setBackpressureQueueThresholds(int reqLimit, int byteLimit) {
        this.m_maxQueuedBytes = byteLimit;
        this.m_backpressureQueueLimit = reqLimit;
        for (NodeConnection cxn : this.m_connections) {
            cxn.m_connection.writeStream().setPendingWriteBackpressureThreshold(reqLimit);
        }
    }

    Distributer() {
        this(false, ClientConfig.DEFAULT_PROCEDURE_TIMEOUT_NANOS, 120000L, null, null, false);
    }

    Distributer(boolean useMultipleThreads, long procedureCallTimeoutNanos, long connectionResponseTimeoutMS, Subject subject, SslContext sslContext, boolean sslHostCheck) {
        this.m_useMultipleThreads = useMultipleThreads;
        this.m_sslContext = sslContext;
        if (this.m_sslContext != null) {
            this.m_cipherService = CipherExecutor.CLIENT;
            this.m_cipherService.startup();
        } else {
            this.m_cipherService = null;
        }
        this.m_sslHostCheck = sslHostCheck;
        this.m_network = new VoltNetworkPool(this.m_useMultipleThreads ? Math.max(1, CoreUtils.availableProcessors() / 4) : 1, 1, "Client");
        this.m_network.start();
        this.m_procedureCallTimeoutNanos = procedureCallTimeoutNanos;
        this.m_connectionResponseTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(connectionResponseTimeoutMS);
        this.m_timeoutReaperHandle = this.m_tmoExec.scheduleAtFixedRate(new CallExpiration(), 1L, 1L, TimeUnit.SECONDS);
        this.m_subject = subject;
    }

    void createConnection(String host, String username, String password, int port, ClientAuthScheme scheme) throws UnknownHostException, IOException {
        byte[] hashedPassword = ConnectionUtil.getHashedPassword(scheme, password);
        this.createConnectionWithHashedCredentials(host, username, hashedPassword, port, scheme);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void createConnectionWithHashedCredentials(String host, String username, byte[] hashedPassword, int port, ClientAuthScheme scheme) throws UnknownHostException, IOException {
        SSLEngine sslEngine = null;
        if (this.m_sslContext != null) {
            sslEngine = this.m_sslContext.newEngine(ByteBufAllocator.DEFAULT, host, port);
            if (this.m_sslHostCheck) {
                SSLParameters parms = sslEngine.getSSLParameters();
                parms.setEndpointIdentificationAlgorithm("HTTPS");
                sslEngine.setSSLParameters(parms);
            }
        }
        Object[] socketChannelAndInstanceIdAndBuildString = ConnectionUtil.getAuthenticatedConnection(host, username, hashedPassword, port, this.m_subject, scheme, sslEngine, TimeUnit.NANOSECONDS.toMillis(this.m_connectionResponseTimeoutNanos));
        SocketChannel aChannel = (SocketChannel)socketChannelAndInstanceIdAndBuildString[0];
        long[] instanceIdWhichIsTimestampAndLeaderIp = (long[])socketChannelAndInstanceIdAndBuildString[1];
        int hostId = (int)instanceIdWhichIsTimestampAndLeaderIp[0];
        ClientConnectionRequestStats stats = new ClientConnectionRequestStats(hostId, host);
        NodeConnection cxn = new NodeConnection(instanceIdWhichIsTimestampAndLeaderIp, stats, hostId);
        Connection c = null;
        try {
            c = this.m_network.registerChannel(aChannel, cxn, this.m_cipherService, sslEngine, EstTime::currentTimeMillis);
        }
        catch (Exception e) {
            try {
                aChannel.close();
            }
            catch (IOException iOException) {
                // empty catch block
            }
            Throwables.propagate(e);
        }
        cxn.setConnection(c);
        Distributer distributer = this;
        synchronized (distributer) {
            cxn.m_connection.writeStream().setPendingWriteBackpressureThreshold(this.m_backpressureQueueLimit);
            if (this.m_connections.size() == 0) {
                this.m_clusterInstanceId = null;
            }
            if (this.m_clusterInstanceId == null) {
                long timestamp = instanceIdWhichIsTimestampAndLeaderIp[2];
                int addr = (int)instanceIdWhichIsTimestampAndLeaderIp[3];
                this.m_clusterInstanceId = new Object[]{timestamp, addr};
            } else if ((Long)this.m_clusterInstanceId[0] != instanceIdWhichIsTimestampAndLeaderIp[2] || ((Integer)this.m_clusterInstanceId[1]).longValue() != instanceIdWhichIsTimestampAndLeaderIp[3]) {
                c.unregister();
                throw new IOException("Cluster instance id mismatch. Current is " + this.m_clusterInstanceId[0] + "," + this.m_clusterInstanceId[1] + " and server's was " + instanceIdWhichIsTimestampAndLeaderIp[2] + "," + instanceIdWhichIsTimestampAndLeaderIp[3]);
            }
            this.m_buildString = (String)socketChannelAndInstanceIdAndBuildString[2];
            if (!cxn.isOutOfService()) {
                this.m_connections.add(cxn);
            }
            this.m_hostIdToConnection.put(hostId, cxn);
            this.addConnectionRequestStats(hostId, stats);
        }
        if (this.m_subscribedConnection == null) {
            this.subscribeToNewNode();
        }
    }

    private void addConnectionRequestStats(int hostId, ClientConnectionRequestStats stats) {
        Iterator<Map.Entry<Integer, ClientConnectionRequestStats>> e = this.m_clientConnectionRequestStats.entrySet().iterator();
        while (e.hasNext()) {
            if (!e.next().getValue().expired()) continue;
            e.remove();
        }
        this.m_clientConnectionRequestStats.put(hostId, stats);
    }

    public Map<Integer, ClientConnectionRequestStats> getConnectionRequestStats() {
        return this.m_clientConnectionRequestStats;
    }

    public void resetConnectionRequestStats() {
        for (ClientConnectionRequestStats e : this.m_clientConnectionRequestStats.values()) {
            e.reset();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void subscribeToNewNode() {
        if (this.m_shutdown.get()) {
            return;
        }
        NodeConnection cxn = null;
        Distributer distributer = this;
        synchronized (distributer) {
            this.m_subscribedConnection = null;
            if (this.m_connections.isEmpty()) {
                return;
            }
            cxn = this.m_connections.get(new Random().nextInt(this.m_connections.size()));
            this.m_subscriptionRequestPending = true;
            this.m_subscribedConnection = cxn;
        }
        try {
            ProcedureInvocation spi = this.makeProcedureInvocation(this.m_sysHandle.getAndDecrement(), "@Subscribe", "TOPOLOGY");
            cxn.createWork(System.nanoTime(), spi.getHandle(), spi.getProcName(), this.serializeSPI(spi), new SubscribeCallback(), true, TOPO_AWARE_PROC_TIMEOUT_NS, spi.getPartitionDestination());
            spi = this.makeProcedureInvocation(this.m_sysHandle.getAndDecrement(), "@Statistics", "TOPO", 0);
            cxn.createWork(System.nanoTime(), spi.getHandle(), spi.getProcName(), this.serializeSPI(spi), new TopoUpdateCallback(), true, TOPO_AWARE_PROC_TIMEOUT_NS, spi.getPartitionDestination());
            if (!this.m_fetchedCatalog) {
                spi = this.makeProcedureInvocation(this.m_sysHandle.getAndDecrement(), "@SystemCatalog", "PROCEDURES");
                cxn.createWork(System.nanoTime(), spi.getHandle(), spi.getProcName(), this.serializeSPI(spi), new ProcUpdateCallback(), true, TOPO_AWARE_PROC_TIMEOUT_NS, spi.getPartitionDestination());
            }
            this.refreshPartitionKeys(true);
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    boolean queue(ProcedureInvocation invocation, ProcedureCallback cb, boolean ignoreBackpressure, long startNanos, long timeoutNanos) throws NoConnectionsException {
        assert (invocation != null);
        assert (cb != null);
        if (this.m_shutdown.get()) {
            return false;
        }
        CxnStatsData statsData = new CxnStatsData();
        int hashedPartition = this.findHashedPartition(invocation);
        NodeConnection cxn = this.findCxnForQueue(invocation, ignoreBackpressure, statsData, hashedPartition);
        if (cxn == null) {
            if (this.m_topologyChangeAware) {
                this.createConnectionsUponTopologyChange();
            }
            return false;
        }
        ByteBuffer buf = null;
        try {
            buf = this.serializeSPI(invocation);
        }
        catch (Exception e) {
            Throwables.propagate(e);
        }
        this.updateAffinityStats(statsData);
        cxn.createWork(startNanos, invocation.getHandle(), invocation.getProcName(), buf, cb, ignoreBackpressure, timeoutNanos, hashedPartition);
        if (this.m_topologyChangeAware) {
            this.createConnectionsUponTopologyChange();
        }
        return true;
    }

    boolean queueNonblocking(ProcedureInvocation invocation, ProcedureCallback cb, long startNanos, long timeoutNanos) throws NoConnectionsException {
        assert (invocation != null);
        assert (cb != null);
        if (this.m_shutdown.get()) {
            return false;
        }
        CxnStatsData statsData = new CxnStatsData();
        int hashedPartition = this.findHashedPartition(invocation);
        NodeConnection cxn = this.findCxnForQueue(invocation, false, statsData, hashedPartition);
        if (cxn == null) {
            if (this.m_topologyChangeAware) {
                this.createConnectionsUponTopologyChange();
            }
            return false;
        }
        ByteBuffer buf = null;
        try {
            buf = this.serializeSPI(invocation);
        }
        catch (Exception e) {
            Throwables.propagate(e);
        }
        boolean queued = cxn.createWorkNonblocking(startNanos, invocation.getHandle(), invocation.getProcName(), buf, cb, timeoutNanos, hashedPartition);
        if (queued) {
            this.updateAffinityStats(statsData);
        }
        if (this.m_topologyChangeAware) {
            this.createConnectionsUponTopologyChange();
        }
        return queued;
    }

    private int findHashedPartition(ProcedureInvocation invocation) {
        ImmutableSortedMap<String, Procedure> procedures = this.m_procedureInfo.get();
        Procedure procedureInfo = null;
        if (procedures != null) {
            procedureInfo = procedures.get(invocation.getProcName());
        }
        int hashedPartition = -1;
        if (invocation.hasPartitionDestination()) {
            hashedPartition = invocation.getPartitionDestination();
        } else if (this.m_hashinator != null && procedureInfo != null) {
            switch (procedureInfo.procType) {
                case SINGLE: {
                    if (procedureInfo.partitionParameter == -1 || procedureInfo.partitionParameter >= invocation.getPassedParamCount()) break;
                    hashedPartition = this.m_hashinator.getHashedPartitionForParameter(procedureInfo.partitionParameterType, invocation.getPartitionParamValue(procedureInfo.partitionParameter));
                    break;
                }
                case MULTI: {
                    hashedPartition = 16383;
                    break;
                }
            }
        }
        return hashedPartition;
    }

    private synchronized NodeConnection findCxnForQueue(ProcedureInvocation invocation, boolean ignoreBackpressure, CxnStatsData statsData, int hashedPartition) throws NoConnectionsException {
        int totalConnections = this.m_connections.size();
        if (totalConnections == 0) {
            if (!this.m_topologyChangeAware) {
                throw new NoConnectionsException("No connections.");
            }
            if (ignoreBackpressure) {
                throw new NoConnectionsException("No connections (and ignoreBackpressure set).");
            }
            this.reportBackpressure(true);
            return null;
        }
        NodeConnection cxn = null;
        statsData.stats = null;
        cxn = this.m_partitionMasters.get(hashedPartition);
        if (cxn != null && (!cxn.m_isConnected || cxn.isOutOfService())) {
            cxn = null;
        }
        boolean roundRobin = false;
        ArrayList<NodeConnection> localList = new ArrayList<NodeConnection>(this.m_connections.size());
        for (NodeConnection c : this.m_connections) {
            if (c.isOutOfService()) continue;
            localList.add(c);
        }
        int cxnCount = localList.size();
        for (int i = 0; cxn == null && i < localList.size(); ++i) {
            NodeConnection tmpCxn;
            if ((tmpCxn = (NodeConnection)localList.get(Math.abs(++this.m_nextConnection % cxnCount))).hadBackPressure() && !ignoreBackpressure) continue;
            cxn = tmpCxn;
            roundRobin = true;
        }
        ClientAffinityStats stats = this.m_clientAffinityStats.get(hashedPartition);
        if (stats == null) {
            stats = new ClientAffinityStats(hashedPartition);
            this.m_clientAffinityStats.put(hashedPartition, stats);
        }
        ImmutableSortedMap<String, Procedure> procedures = this.m_procedureInfo.get();
        Procedure procedureInfo = null;
        if (procedures != null) {
            procedureInfo = procedures.get(invocation.getProcName());
        }
        statsData.stats = stats;
        statsData.readOnly = procedureInfo != null && procedureInfo.readOnly;
        statsData.roundRobin = roundRobin;
        if (cxn == null) {
            this.reportBackpressure(true);
        }
        return cxn;
    }

    private void reportBackpressure(boolean bp) {
        if (this.m_lastBackpressureReport ^ bp || !bp) {
            this.m_lastBackpressureReport = bp;
            for (ClientStatusListenerExt sl : this.m_listeners) {
                sl.backpressure(bp);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateAffinityStats(CxnStatsData data) {
        if (data.stats != null) {
            ClientAffinityStats clientAffinityStats = data.stats;
            synchronized (clientAffinityStats) {
                if (data.roundRobin) {
                    if (data.readOnly) {
                        data.stats.addRrRead();
                    } else {
                        data.stats.addRrWrite();
                    }
                } else if (data.readOnly) {
                    data.stats.addAffinityRead();
                } else {
                    data.stats.addAffinityWrite();
                }
            }
        }
    }

    final void shutdown() throws InterruptedException {
        this.m_shutdown.set(true);
        this.m_rateLimiter.setNonblockingResumeHook(null);
        if (CoreUtils.isJunitTest()) {
            this.m_timeoutReaperHandle.cancel(true);
            this.m_tmoExec.shutdownNow();
            this.m_subsExec.shutdownNow();
        } else {
            this.m_timeoutReaperHandle.cancel(false);
            this.m_tmoExec.shutdown();
            this.m_subsExec.shutdown();
            this.m_tmoExec.awaitTermination(365L, TimeUnit.DAYS);
            this.m_subsExec.awaitTermination(365L, TimeUnit.DAYS);
        }
        this.m_network.shutdown();
        if (this.m_cipherService != null) {
            this.m_cipherService.shutdown();
            this.m_cipherService = null;
        }
    }

    void uncaughtException(ProcedureCallback cb, ClientResponse r, Throwable t) {
        boolean handledByClient = false;
        for (ClientStatusListenerExt csl : this.m_listeners) {
            if (csl instanceof ClientImpl.InternalClientStatusListener) continue;
            try {
                csl.uncaughtException(cb, r, t);
                handledByClient = true;
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
        if (!handledByClient) {
            t.printStackTrace();
        }
    }

    synchronized void addClientStatusListener(ClientStatusListenerExt listener) {
        if (!this.m_listeners.contains(listener)) {
            this.m_listeners.add(listener);
        }
    }

    synchronized boolean removeClientStatusListener(ClientStatusListenerExt listener) {
        return this.m_listeners.remove(listener);
    }

    ClientStatsContext createStatsContext() {
        return new ClientStatsContext(this, this.getStatsSnapshot(), this.getIOStatsSnapshot(), this.getAffinityStatsSnapshot());
    }

    Map<Long, Map<String, ClientStats>> getStatsSnapshot() {
        TreeMap<Long, Map<String, ClientStats>> retval = new TreeMap<Long, Map<String, ClientStats>>();
        for (NodeConnection conn : this.m_connections) {
            TreeMap<String, ClientStats> connMap = new TreeMap<String, ClientStats>();
            for (Map.Entry<String, ClientStats> e : conn.m_stats.entrySet()) {
                connMap.put(e.getKey(), (ClientStats)e.getValue().clone());
            }
            retval.put(conn.connectionId(), connMap);
        }
        return retval;
    }

    Map<Long, ClientIOStats> getIOStatsSnapshot() {
        Set liveConnections = this.m_connections.stream().map(VoltProtocolHandler::connectionId).collect(Collectors.toSet());
        return this.m_network.getIOStats(false).stream().filter(ioStatsData -> liveConnections.contains(ioStatsData.getConnectionId())).map(ClientIOStats::new).collect(Collectors.toMap(ClientIOStats::getConnectionId, Function.identity()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    Map<Integer, ClientAffinityStats> getAffinityStatsSnapshot() {
        HashMap<Integer, ClientAffinityStats> retval = new HashMap<Integer, ClientAffinityStats>();
        Distributer distributer = this;
        synchronized (distributer) {
            for (Map.Entry<Integer, ClientAffinityStats> e : this.m_clientAffinityStats.entrySet()) {
                retval.put(e.getKey(), (ClientAffinityStats)e.getValue().clone());
            }
        }
        return retval;
    }

    public synchronized Object[] getInstanceId() {
        return this.m_clusterInstanceId;
    }

    public synchronized void resetInstanceId() {
        this.m_clusterInstanceId = null;
    }

    public String getBuildString() {
        return this.m_buildString;
    }

    public List<Long> getThreadIds() {
        return this.m_network.getThreadIds();
    }

    public List<InetSocketAddress> getConnectedHostList() {
        ArrayList<InetSocketAddress> addressList = new ArrayList<InetSocketAddress>();
        for (NodeConnection conn : this.m_connections) {
            addressList.add(conn.getSocketAddress());
        }
        return Collections.unmodifiableList(addressList);
    }

    public Map<String, Integer> getConnectedHostIPAndPort() {
        HashMap<String, Integer> connectedHostIPAndPortMap = Maps.newHashMap();
        for (NodeConnection conn : this.m_connections) {
            connectedHostIPAndPortMap.put(conn.getSocketAddress().getAddress().getHostAddress(), conn.getSocketAddress().getPort());
        }
        return Collections.unmodifiableMap(connectedHostIPAndPortMap);
    }

    private void updateAffinityTopology(VoltTable[] tables) {
        VoltTable vt = tables[0];
        boolean cooked = false;
        if (tables.length == 1) {
            int numPartitions = vt.getRowCount() - 1;
            this.m_hashinator = new HashinatorLite(numPartitions);
        } else {
            boolean advanced = tables[1].advanceRow();
            if (!advanced) {
                System.err.println("Topology description received from the database was incomplete performance will be lower because transactions can't be routed at this client");
                return;
            }
            this.m_hashinator = new HashinatorLite(tables[1].getVarbinary("HASHCONFIG"), cooked);
        }
        this.m_partitionMasters.clear();
        HashSet<Integer> unconnected = new HashSet<Integer>();
        while (vt.advanceRow()) {
            Integer partition = (int)vt.getLong("Partition");
            String leader = vt.getString("Leader");
            String sites = vt.getString("Sites");
            if (Strings.isNullOrEmpty(sites) || Strings.isNullOrEmpty(leader)) continue;
            for (String site : sites.split(",")) {
                Integer hostId = Integer.valueOf((site = site.trim()).split(":")[0]);
                if (this.m_hostIdToConnection.containsKey(hostId)) continue;
                unconnected.add(hostId);
            }
            Integer leaderHostId = Integer.valueOf(leader.split(":")[0]);
            if (!this.m_hostIdToConnection.containsKey(leaderHostId)) continue;
            this.m_partitionMasters.put(partition, this.m_hostIdToConnection.get(leaderHostId));
        }
        if (this.m_topologyChangeAware) {
            this.m_unconnectedHosts.set(ImmutableSet.copyOf(unconnected));
        }
        this.refreshPartitionKeys(true);
    }

    private void updateProcedurePartitioning(VoltTable vt) {
        HashMap<String, Procedure> procs = Maps.newHashMap();
        while (vt.advanceRow()) {
            try {
                String jsString = vt.getString(6);
                String procedureName = vt.getString(2);
                JSONObject jsObj = new JSONObject(jsString);
                boolean readOnly = jsObj.optBoolean("readOnly");
                boolean compound = jsObj.optBoolean("compound");
                boolean single = jsObj.optBoolean("singlePartition");
                int partitionParam = -1;
                int paramType = -1;
                if (single) {
                    partitionParam = jsObj.getInt("partitionParameter");
                    paramType = jsObj.getInt("partitionParameterType");
                }
                procs.put(procedureName, new Procedure(single, compound, readOnly, partitionParam, paramType));
            }
            catch (JSONException e) {
                e.printStackTrace();
            }
        }
        ImmutableSortedMap<String, Procedure> oldProcs = this.m_procedureInfo.get();
        this.m_procedureInfo.compareAndSet(oldProcs, ImmutableSortedMap.copyOf(procs));
    }

    private void updatePartitioning(VoltTable vt) {
        ImmutableMap.Builder<Integer, Integer> builder = ImmutableMap.builder();
        ImmutableMap.Builder<Integer, Integer> keyBuilder = ImmutableMap.builder();
        while (vt.advanceRow()) {
            if (vt.getColumnCount() != 2) continue;
            Integer partitionId = (int)vt.getLong("PARTITION_ID");
            Integer key = (int)vt.getLong("PARTITION_KEY");
            builder.put(partitionId, key);
            keyBuilder.put(key, partitionId);
        }
        this.m_partitionKeys.set(builder.build());
        this.m_partitionKeyMap.set(keyBuilder.build());
    }

    public boolean isHashinatorInitialized() {
        return this.m_hashinator != null;
    }

    public long getPartitionForParameter(byte typeValue, Object value) {
        if (this.m_hashinator == null) {
            return -1L;
        }
        return this.m_hashinator.getHashedPartitionForParameter(typeValue, value);
    }

    public long getPartitionForParameter(byte[] bytes) {
        if (this.m_hashinator == null) {
            return -1L;
        }
        return this.m_hashinator.getHashedPartitionForParameter(bytes);
    }

    private ByteBuffer serializeSPI(ProcedureInvocation pi) throws IOException {
        ByteBuffer buf = ByteBuffer.allocate(pi.getSerializedSize() + 4);
        buf.putInt(buf.capacity() - 4);
        pi.flattenToBuffer(buf);
        buf.flip();
        return buf;
    }

    long getProcedureTimeoutNanos() {
        return this.m_procedureCallTimeoutNanos;
    }

    ImmutableMap<Integer, Integer> getPartitionKeys() throws NoConnectionsException, IOException, ProcCallException {
        this.refreshPartitionKeys(false);
        if (this.m_partitionUpdateStatus.get().getStatus() != 1) {
            throw new ProcCallException(this.m_partitionUpdateStatus.get());
        }
        return this.m_partitionKeys.get();
    }

    private void refreshPartitionKeys(boolean topologyUpdate) {
        if (this.m_shutdown.get()) {
            return;
        }
        try {
            PartitionUpdateCallback cb;
            ProcedureInvocation invocation = this.makeProcedureInvocation(this.m_sysHandle.getAndDecrement(), "@GetPartitionKeys", "INTEGER");
            CountDownLatch latch = null;
            if (!topologyUpdate) {
                latch = new CountDownLatch(1);
            }
            if (!this.queue(invocation, cb = new PartitionUpdateCallback(latch), true, System.nanoTime(), TOPO_AWARE_PROC_TIMEOUT_NS)) {
                this.m_partitionUpdateStatus.set(new ClientResponseImpl(-5, new VoltTable[0], "Fails to queue the partition update query, please try later."));
            }
            if (!topologyUpdate) {
                latch.await(1L, TimeUnit.MINUTES);
            }
            this.m_lastPartitionKeyFetched.set(System.currentTimeMillis());
        }
        catch (IOException | InterruptedException e) {
            this.m_partitionUpdateStatus.set(new ClientResponseImpl(-5, new VoltTable[0], "Fails to fetch partition keys from server:" + e.getMessage()));
        }
    }

    private ProcedureInvocation makeProcedureInvocation(long handle, String procName, Object ... parameters) {
        return new ProcedureInvocation(handle, -1, -1, this.m_sysRequestPrio, procName, parameters);
    }

    void useRequestPriority() {
        this.m_sysRequestPrio = 2;
    }

    void setTopologyChangeAware(boolean topoAware) {
        this.m_topologyChangeAware = topoAware;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void createConnectionsUponTopologyChange() {
        if (!this.m_topologyChangeAware) {
            return;
        }
        if (!this.m_createConnectionUponTopoChangeInProgress.compareAndSet(false, true)) {
            return;
        }
        try {
            ImmutableSet<Integer> unconnected = this.m_unconnectedHosts.get();
            if (unconnected == null || unconnected.isEmpty()) {
                return;
            }
            ClientImpl.InternalClientStatusListener internalListener = null;
            for (ClientStatusListenerExt csl : this.m_listeners) {
                if (!(csl instanceof ClientImpl.InternalClientStatusListener)) continue;
                internalListener = (ClientImpl.InternalClientStatusListener)csl;
                break;
            }
            if (internalListener == null) {
                return;
            }
            unconnected = this.m_unconnectedHosts.getAndSet(ImmutableSet.copyOf(new HashSet()));
            for (Integer host : unconnected) {
                if (this.isHostConnected(host)) continue;
                internalListener.createConnectionsUponTopologyChange();
                break;
            }
        }
        finally {
            this.m_createConnectionUponTopoChangeInProgress.set(false);
        }
    }

    void setCreateConnectionsUponTopologyChangeComplete() throws NoConnectionsException {
        this.m_createConnectionUponTopoChangeInProgress.set(false);
        ProcedureInvocation spi = this.makeProcedureInvocation(this.m_sysHandle.getAndDecrement(), "@Statistics", "TOPO", 0);
        this.queue(spi, new TopoUpdateCallback(), true, System.nanoTime(), TOPO_AWARE_PROC_TIMEOUT_NS);
    }

    boolean isHostConnected(Integer hostId) {
        return this.m_hostIdToConnection.containsKey(hostId);
    }

    private class NodeConnection
    extends VoltProtocolHandler
    implements QueueMonitor {
        private final ConcurrentMap<Long, CallbackBookkeeping> m_callbacks = new ConcurrentHashMap<Long, CallbackBookkeeping>();
        private final NonBlockingHashMap<String, ClientStats> m_stats = new NonBlockingHashMap();
        private Connection m_connection;
        private volatile boolean m_isConnected = true;
        private volatile boolean m_isOutOfService = false;
        private boolean m_nonblockingInitDone = false;
        private final int m_hostId;
        volatile long m_lastResponseTimeNanos = System.nanoTime();
        boolean m_outstandingPing = false;
        private int m_queuedBytes;
        ClientStatusListenerExt.DisconnectCause m_closeCause = ClientStatusListenerExt.DisconnectCause.CONNECTION_CLOSED;
        private final ClientConnectionRequestStats m_connectionRequestStats;

        public NodeConnection(long[] ids, ClientConnectionRequestStats stats, int hostId) {
            this.m_connectionRequestStats = stats;
            this.m_hostId = hostId;
        }

        boolean isOutOfService() {
            return this.m_isOutOfService;
        }

        void setOutOfService() {
            this.m_isOutOfService = true;
        }

        public void createWork(long startNanos, long handle, String name, ByteBuffer buffer, ProcedureCallback callback, boolean ignoreBackpressure, long timeoutNanos, int partition) {
            assert (callback != null);
            if (timeoutNanos == 0L) {
                timeoutNanos = Distributer.this.m_procedureCallTimeoutNanos;
            }
            try {
                Distributer.this.m_rateLimiter.prepareToSendTransaction(startNanos, timeoutNanos, ignoreBackpressure);
            }
            catch (InterruptedException | TimeoutException e) {
                this.invokeCallbackWithTimeout(name, callback, startNanos, System.nanoTime(), timeoutNanos, handle, partition, ignoreBackpressure);
                return;
            }
            this.createWorkCommon(startNanos, handle, name, buffer, callback, ignoreBackpressure, timeoutNanos, partition);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean createWorkNonblocking(long startNanos, long handle, String name, ByteBuffer buffer, ProcedureCallback callback, long timeoutNanos, int partition) {
            assert (callback != null);
            if (timeoutNanos == 0L) {
                timeoutNanos = Distributer.this.m_procedureCallTimeoutNanos;
            }
            Object object = this;
            synchronized (object) {
                if (!this.m_nonblockingInitDone) {
                    Distributer.this.m_rateLimiter.setNonblockingResumeHook(this.offBackPressure());
                    this.m_nonblockingInitDone = true;
                }
            }
            object = Distributer.this;
            synchronized (object) {
                if (!Distributer.this.m_rateLimiter.prepareToSendTransactionNonblocking()) {
                    Distributer.this.reportBackpressure(true);
                    return false;
                }
            }
            this.createWorkCommon(startNanos, handle, name, buffer, callback, false, timeoutNanos, partition);
            return true;
        }

        private void createWorkCommon(long startNanos, long handle, String name, ByteBuffer buffer, ProcedureCallback callback, boolean ignoreBackpressure, long timeoutNanos, int partition) {
            assert (!this.m_callbacks.containsKey(handle));
            this.m_callbacks.put(handle, new CallbackBookkeeping(startNanos, callback, name, timeoutNanos, ignoreBackpressure, partition));
            if (timeoutNanos < ONE_SECOND_NANOS && !Distributer.isLongOp(name)) {
                long timeoutRemaining = startNanos + timeoutNanos - System.nanoTime();
                this.submitDiscreteTimeoutTask(handle, Math.max(0L, timeoutRemaining));
            }
            if (this.m_isConnected) {
                this.m_connection.writeStream().enqueue(buffer);
                this.incrementInvocation(handle);
                return;
            }
            if (this.m_callbacks.remove(handle) != null) {
                String msg = String.format("Connection to database host (%s) was lost before a response was received", this.m_connection.getHostnameOrIP());
                ClientResponseImpl resp = new ClientResponseImpl(-4, new VoltTable[0], msg);
                try {
                    callback.clientCallback(resp);
                }
                catch (Exception e) {
                    Distributer.this.uncaughtException(callback, resp, e);
                }
                Distributer.this.m_rateLimiter.transactionResponseReceived(System.nanoTime(), -1, ignoreBackpressure);
            }
        }

        private void incrementInvocation(long clientHandle) {
            if (clientHandle >= 0L && clientHandle < 0x7FFFFFFFFFFFFFFBL) {
                this.m_connectionRequestStats.incrementInvocation();
            }
        }

        private void incrementResponse(long clientHandle) {
            if (clientHandle >= 0L && clientHandle < 0x7FFFFFFFFFFFFFFBL) {
                this.m_connectionRequestStats.incrementResponse();
            }
        }

        void submitDiscreteTimeoutTask(final long handle, long timeoutNanos) {
            Distributer.this.m_tmoExec.schedule(new Runnable(){

                @Override
                public void run() {
                    NodeConnection.this.handleTimedoutCallback(handle, System.nanoTime());
                }
            }, timeoutNanos, TimeUnit.NANOSECONDS);
        }

        private void handleTimedoutCallback(long handle, long endNanos) {
            CallbackBookkeeping cb = (CallbackBookkeeping)this.m_callbacks.remove(handle);
            if (cb != null) {
                this.invokeCallbackWithTimeout(cb.name, cb.callback, cb.startNanos, endNanos, cb.procedureTimeoutNanos, handle, cb.partition, cb.ignoreBackpressure);
            }
        }

        private void invokeCallbackWithTimeout(String procName, ProcedureCallback callback, long startNanos, long endNanos, long timeoutNanos, long handle, int hashedPartition, boolean ignoreBackpressure) {
            int partition = -1;
            ImmutableMap<Integer, Integer> map = Distributer.this.m_partitionKeyMap.get();
            if (map != null && map.containsKey(hashedPartition)) {
                partition = map.get(hashedPartition);
            }
            ClientResponseImpl r = new ClientResponseImpl(-6, -128, "", new VoltTable[0], String.format("No response received in the allotted time (set to %d ms) from %s (host id:%d) on partition %d).", TimeUnit.NANOSECONDS.toMillis(timeoutNanos), this.m_connection.getHostnameOrIP(), this.m_hostId, partition));
            long deltaNanos = Math.max(1L, endNanos - startNanos);
            r.setClientHandle(handle);
            r.setClientRoundtrip(deltaNanos);
            r.setClusterRoundtrip((int)TimeUnit.NANOSECONDS.toMillis(deltaNanos));
            try {
                callback.clientCallback(r);
            }
            catch (Throwable t) {
                Distributer.this.uncaughtException(callback, r, t);
            }
            Distributer.this.m_rateLimiter.transactionResponseReceived(endNanos, -1, ignoreBackpressure);
            this.updateStatsForTimeout(procName, r.getClientRoundtripNanos(), r.getClusterRoundtrip(), handle);
        }

        void sendPing() {
            ProcedureInvocation invocation = Distributer.this.makeProcedureInvocation(Long.MAX_VALUE, "@Ping", new Object[0]);
            ByteBuffer buf = ByteBuffer.allocate(4 + invocation.getSerializedSize());
            buf.putInt(buf.capacity() - 4);
            try {
                invocation.flattenToBuffer(buf);
                buf.flip();
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            this.m_connection.writeStream().enqueue(buf);
            this.m_outstandingPing = true;
        }

        private void updateStatsForTimeout(final String procName, final long roundTripNanos, final int clusterRoundTrip, final long clientHandle) {
            this.m_connection.queueTask(new Runnable(){

                @Override
                public void run() {
                    NodeConnection.this.updateStats(procName, roundTripNanos, clusterRoundTrip, false, false, true, clientHandle);
                }
            });
        }

        private void updateStats(String procName, long roundTripNanos, int clusterRoundTrip, boolean abort, boolean failure, boolean timeout, long clientHandle) {
            ClientStats stats = this.m_stats.get(procName);
            if (stats == null) {
                stats = new ClientStats();
                stats.m_connectionId = this.connectionId();
                stats.m_hostname = this.m_connection.getHostnameOrIP();
                stats.m_port = this.m_connection.getRemotePort();
                stats.m_procName = procName;
                stats.m_startTS = System.currentTimeMillis();
                stats.m_endTS = Long.MIN_VALUE;
                this.m_stats.put(procName, stats);
            }
            this.incrementInvocation(clientHandle);
            stats.update(roundTripNanos, clusterRoundTrip, abort, failure, timeout);
        }

        @Override
        public void handleMessage(ByteBuffer buf, Connection c) {
            block16: {
                CallbackBookkeeping stuff;
                long handle;
                ClientResponseImpl response;
                long endNanos;
                block15: {
                    endNanos = System.nanoTime();
                    response = new ClientResponseImpl();
                    try {
                        response.initFromBuffer(buf);
                    }
                    catch (IOException e1) {
                        e1.printStackTrace();
                    }
                    this.m_lastResponseTimeNanos = endNanos;
                    handle = response.getClientHandle();
                    if (handle == Long.MAX_VALUE) {
                        this.m_outstandingPing = false;
                        return;
                    }
                    if (handle == 0x7FFFFFFFFFFFFFFEL) {
                        TopoUpdateCallback cb = new TopoUpdateCallback();
                        try {
                            cb.clientCallback(response);
                        }
                        catch (Exception e) {
                            Distributer.this.uncaughtException(cb, response, e);
                        }
                        return;
                    }
                    if (handle == 0x7FFFFFFFFFFFFFFDL) {
                        ProcUpdateCallback cb = new ProcUpdateCallback();
                        try {
                            cb.clientCallback(response);
                        }
                        catch (Exception e) {
                            Distributer.this.uncaughtException(cb, response, e);
                        }
                        return;
                    }
                    if (handle == 0x7FFFFFFFFFFFFFFCL) {
                        this.m_isOutOfService = true;
                        return;
                    }
                    stuff = (CallbackBookkeeping)this.m_callbacks.remove(response.getClientHandle());
                    if (stuff != null) break block15;
                    if (handle < 0L) break block16;
                    for (ClientStatusListenerExt listener : Distributer.this.m_listeners) {
                        listener.lateProcedureResponse(response, this.m_connection.getHostnameOrIP(), this.m_connection.getRemotePort());
                    }
                    break block16;
                }
                this.incrementResponse(handle);
                long callTimeNanos = stuff.startNanos;
                long deltaNanos = Math.max(1L, endNanos - callTimeNanos);
                ProcedureCallback cb = stuff.callback;
                assert (cb != null);
                boolean abort = response.aborted();
                boolean error = response.failed();
                int clusterRoundTrip = response.getClusterRoundtrip();
                Distributer.this.m_rateLimiter.transactionResponseReceived(endNanos, clusterRoundTrip, stuff.ignoreBackpressure);
                this.updateStats(stuff.name, deltaNanos, clusterRoundTrip, abort, error, false, -1L);
                response.setClientRoundtrip(deltaNanos);
                assert (response.getHashes() == null) : "A determinism hash snuck into the client wire protocol";
                try {
                    cb.clientCallback(response);
                }
                catch (Throwable t) {
                    Distributer.this.uncaughtException(cb, response, t);
                }
            }
        }

        @Override
        public int getMaxRead() {
            return Integer.MAX_VALUE;
        }

        public boolean hadBackPressure() {
            return this.m_connection.writeStream().hadBackPressure();
        }

        public void setConnection(Connection c) {
            this.m_connection = c;
            for (ClientStatusListenerExt listener : Distributer.this.m_listeners) {
                listener.connectionCreated(this.m_connection.getHostnameOrIP(), this.m_connection.getRemotePort(), ClientStatusListenerExt.AutoConnectionStatus.SUCCESS);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void stopping(Connection c) {
            super.stopping(c);
            this.m_isConnected = false;
            Distributer distributer = Distributer.this;
            synchronized (distributer) {
                Iterator<Map.Entry<Integer, NodeConnection>> i = Distributer.this.m_partitionMasters.entrySet().iterator();
                while (i.hasNext()) {
                    Map.Entry<Integer, NodeConnection> entry = i.next();
                    if (entry.getValue() != this) continue;
                    i.remove();
                }
                Distributer.this.m_hostIdToConnection.remove(this.m_hostId);
                Distributer.this.m_connections.remove(this);
                for (ClientStatusListenerExt s : Distributer.this.m_listeners) {
                    s.connectionLost(this.m_connection.getHostnameOrIP(), this.m_connection.getRemotePort(), Distributer.this.m_connections.size(), this.m_closeCause);
                }
                if (Distributer.this.m_subscribedConnection == this && !Distributer.this.m_subscriptionRequestPending && !Distributer.this.m_shutdown.get()) {
                    try {
                        Distributer.this.m_subsExec.schedule(new Runnable(){

                            @Override
                            public void run() {
                                Distributer.this.subscribeToNewNode();
                            }
                        }, (long)new Random().nextInt(RESUBSCRIPTION_DELAY_MS), TimeUnit.MILLISECONDS);
                    }
                    catch (RejectedExecutionException ree) {
                        return;
                    }
                }
            }
            ClientResponseImpl r = new ClientResponseImpl(-4, new VoltTable[0], "Connection to database host (" + this.m_connection.getHostnameOrIP() + ") was lost before a response was received");
            for (Map.Entry e : this.m_callbacks.entrySet()) {
                if (this.m_callbacks.remove(e.getKey()) == null) continue;
                CallbackBookkeeping callBk = (CallbackBookkeeping)e.getValue();
                try {
                    callBk.callback.clientCallback(r);
                }
                catch (Throwable t) {
                    Distributer.this.uncaughtException(callBk.callback, r, t);
                }
                Distributer.this.m_rateLimiter.transactionResponseReceived(System.nanoTime(), -1, callBk.ignoreBackpressure);
            }
        }

        @Override
        public Runnable offBackPressure() {
            return new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    Distributer distributer = Distributer.this;
                    synchronized (distributer) {
                        Distributer.this.reportBackpressure(false);
                    }
                }
            };
        }

        @Override
        public Runnable onBackPressure() {
            return null;
        }

        @Override
        public QueueMonitor writestreamMonitor() {
            return this;
        }

        @Override
        public boolean queue(int bytes) {
            this.m_queuedBytes += bytes;
            return this.m_queuedBytes > Distributer.this.m_maxQueuedBytes;
        }

        public InetSocketAddress getSocketAddress() {
            return this.m_connection.getRemoteSocketAddress();
        }
    }

    private class CallExpiration
    implements Runnable {
        private CallExpiration() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                ArrayList<NodeConnection> connections = new ArrayList<NodeConnection>();
                Distributer distributer = Distributer.this;
                synchronized (distributer) {
                    connections.addAll(Distributer.this.m_connections);
                }
                long nowNanos = System.nanoTime();
                for (NodeConnection c : connections) {
                    long sinceLastResponse = Math.max(1L, nowNanos - c.m_lastResponseTimeNanos);
                    if (c.m_outstandingPing && sinceLastResponse > Distributer.this.m_connectionResponseTimeoutNanos) {
                        c.m_closeCause = ClientStatusListenerExt.DisconnectCause.TIMEOUT;
                        c.m_connection.unregister();
                    }
                    if (!c.m_outstandingPing && sinceLastResponse > Distributer.this.m_connectionResponseTimeoutNanos / 3L) {
                        c.sendPing();
                    }
                    for (Map.Entry e : c.m_callbacks.entrySet()) {
                        long handle = (Long)e.getKey();
                        CallbackBookkeeping cb = (CallbackBookkeeping)e.getValue();
                        long deltaNanos = Math.max(1L, nowNanos - cb.startNanos);
                        if (deltaNanos <= cb.procedureTimeoutNanos || Distributer.isLongOp(cb.name) && deltaNanos < MINIMUM_LONG_RUNNING_SYSTEM_CALL_TIMEOUT_NANOS) continue;
                        c.handleTimedoutCallback(handle, nowNanos);
                    }
                }
            }
            catch (Throwable t) {
                t.printStackTrace();
            }
        }
    }

    private class SubscribeCallback
    implements ProcedureCallback {
        private SubscribeCallback() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void clientCallback(ClientResponse response) throws Exception {
            if (Distributer.this.m_shutdown.get()) {
                return;
            }
            if (response.getStatus() == -4) {
                if (!Distributer.this.m_connections.isEmpty()) {
                    Distributer.this.subscribeToNewNode();
                }
                return;
            }
            if (response.getStatus() != 1 && !Distributer.this.m_shutdown.get()) {
                try {
                    Distributer.this.m_subsExec.schedule(new Runnable(){

                        @Override
                        public void run() {
                            try {
                                Distributer.this.subscribeToNewNode();
                            }
                            catch (Throwable t) {
                                t.printStackTrace();
                                Throwables.propagate(t);
                            }
                        }
                    }, 2L, TimeUnit.MINUTES);
                }
                catch (RejectedExecutionException rejectedExecutionException) {
                    // empty catch block
                }
                return;
            }
            Distributer distributer = Distributer.this;
            synchronized (distributer) {
                Distributer.this.m_subscriptionRequestPending = false;
            }
        }
    }

    private class TopoUpdateCallback
    implements ProcedureCallback {
        private TopoUpdateCallback() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void clientCallback(ClientResponse clientResponse) throws Exception {
            if (clientResponse.getStatus() != 1) {
                try {
                    Distributer.this.m_subsExec.submit(Distributer.this::subscribeToNewNode);
                }
                catch (RejectedExecutionException rejectedExecutionException) {
                    // empty catch block
                }
                return;
            }
            try {
                Distributer distributer = Distributer.this;
                synchronized (distributer) {
                    VoltTable[] results = clientResponse.getResults();
                    if (results != null && results.length > 1) {
                        Distributer.this.updateAffinityTopology(results);
                    }
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private class ProcUpdateCallback
    implements ProcedureCallback {
        private ProcUpdateCallback() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void clientCallback(ClientResponse clientResponse) throws Exception {
            if (clientResponse.getStatus() != 1) {
                return;
            }
            try {
                Distributer distributer = Distributer.this;
                synchronized (distributer) {
                    VoltTable[] results = clientResponse.getResults();
                    if (results != null && results.length == 1) {
                        VoltTable vt = results[0];
                        Distributer.this.updateProcedurePartitioning(vt);
                    }
                    Distributer.this.m_fetchedCatalog = true;
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private static class CxnStatsData {
        ClientAffinityStats stats;
        boolean readOnly;
        boolean roundRobin;

        private CxnStatsData() {
        }
    }

    private static final class Procedure {
        static final int PARAMETER_NONE = -1;
        private final Type procType;
        private final boolean readOnly;
        private final int partitionParameter;
        private final int partitionParameterType;

        private Procedure(boolean single, boolean compound, boolean readOnly, int partitionParameter, int partitionParameterType) {
            this.procType = single ? Type.SINGLE : (compound ? Type.COMPOUND : Type.MULTI);
            this.readOnly = readOnly;
            this.partitionParameter = single ? partitionParameter : -1;
            this.partitionParameterType = single ? partitionParameterType : -1;
        }

        private static enum Type {
            SINGLE,
            MULTI,
            COMPOUND;

        }
    }

    private class PartitionUpdateCallback
    implements ProcedureCallback {
        private final CountDownLatch m_latch;

        PartitionUpdateCallback(CountDownLatch latch) {
            this.m_latch = latch;
        }

        @Override
        public void clientCallback(ClientResponse clientResponse) throws Exception {
            VoltTable[] results;
            if (clientResponse.getStatus() == 1 && (results = clientResponse.getResults()) != null && results.length > 0) {
                Distributer.this.updatePartitioning(results[0]);
            }
            Distributer.this.m_partitionUpdateStatus.set(clientResponse);
            if (this.m_latch != null) {
                this.m_latch.countDown();
            }
        }
    }

    private class CallbackBookkeeping {
        final long startNanos;
        final long procedureTimeoutNanos;
        final ProcedureCallback callback;
        final String name;
        final boolean ignoreBackpressure;
        final int partition;

        public CallbackBookkeeping(long startNanos, ProcedureCallback callback, String name, long timeoutNanos, boolean ignoreBackpressure, int partition) {
            assert (callback != null);
            this.startNanos = startNanos;
            this.callback = callback;
            this.name = name;
            this.procedureTimeoutNanos = timeoutNanos;
            this.ignoreBackpressure = ignoreBackpressure;
            this.partition = partition;
        }
    }
}

