/*
 * Decompiled with CFR 0.152.
 */
package org.voltcore.network;

import com.google_voltpatches.common.annotations.VisibleForTesting;
import com.google_voltpatches.common.collect.ImmutableList;
import com.google_voltpatches.common.util.concurrent.SettableFuture;
import io.netty_voltpatches.NinjaKeySet;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.voltcore.network.CipherExecutor;
import org.voltcore.network.Connection;
import org.voltcore.network.InputHandler;
import org.voltcore.network.NetworkDBBPool;
import org.voltcore.network.ReverseDNSPolicy;
import org.voltcore.network.TLSException;
import org.voltcore.network.VoltPort;
import org.voltcore.network.VoltPortFactory;
import org.voltcore.network.metrics.IOStatsCollector;
import org.voltcore.network.metrics.IOStatsData;
import org.voltcore.network.metrics.NetworkMetricsCollector;
import org.voltcore.network.metrics.TLSStatsCollector;
import org.voltcore.network.metrics.TLSStatsData;
import org.voltcore.network.util.TimeProvider;
import org.voltcore.utils.LatencyWatchdog;

class VoltNetwork
implements Runnable,
IOStatsCollector,
TLSStatsCollector {
    private final Selector m_selector;
    private static final Logger m_logger = LoggerFactory.getLogger(VoltNetwork.class);
    private static final Logger networkLog = LoggerFactory.getLogger((String)"NETWORK");
    private final Queue<Runnable> m_tasks = new ConcurrentLinkedQueue<Runnable>();
    private final NetworkMetricsCollector m_metricCollector;
    private volatile boolean m_shouldStop = false;
    private final Thread m_thread;
    private final Set<VoltPort> m_ports = new HashSet<VoltPort>();
    private final AtomicInteger m_numPorts = new AtomicInteger();
    final NetworkDBBPool m_pool = new NetworkDBBPool();
    final String networkThreadName;
    private final NinjaKeySet m_ninjaSelectedKeys;

    VoltNetwork(int networkId, String networkName, NetworkMetricsCollector metricCollector) {
        this.networkThreadName = "DB " + networkName + " Network - " + networkId;
        this.m_thread = new Thread((Runnable)this, this.networkThreadName);
        this.m_thread.setDaemon(true);
        try {
            this.m_selector = Selector.open();
        }
        catch (IOException ex) {
            m_logger.error("Could not open selector.", (Throwable)ex);
            throw new RuntimeException(ex);
        }
        this.m_ninjaSelectedKeys = NinjaKeySet.instrumentSelector(this.m_selector);
        this.m_metricCollector = metricCollector;
    }

    @VisibleForTesting
    VoltNetwork(Selector selector, NetworkMetricsCollector metricCollector) {
        this.m_thread = null;
        this.m_selector = selector;
        this.networkThreadName = "Test Selector Thread";
        this.m_ninjaSelectedKeys = NinjaKeySet.instrumentSelector(this.m_selector);
        this.m_metricCollector = metricCollector;
    }

    void start() {
        this.m_thread.start();
    }

    void shutdown() throws InterruptedException {
        this.m_shouldStop = true;
        if (this.m_thread != null) {
            this.m_selector.wakeup();
            this.m_thread.join();
        }
    }

    boolean isStopping() {
        return this.m_shouldStop;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    Connection registerChannel(SocketChannel channel, InputHandler handler, int interestOps, ReverseDNSPolicy dns, CipherExecutor cipherService, SSLEngine sslEngine, TimeProvider timeProvider) throws IOException {
        Object object = channel.blockingLock();
        synchronized (object) {
            channel.configureBlocking(false);
            channel.socket().setKeepAlive(true);
        }
        Callable<Connection> registerTask = () -> {
            VoltPort port = VoltPortFactory.createVoltPort(this, handler, (InetSocketAddress)channel.socket().getRemoteSocketAddress(), this.m_pool, cipherService, sslEngine, timeProvider);
            port.registering();
            if (dns != ReverseDNSPolicy.NONE) {
                port.resolveHostname(dns == ReverseDNSPolicy.SYNCHRONOUS);
            }
            try {
                SelectionKey key = channel.register(this.m_selector, interestOps, null);
                port.setKey(key);
                port.registered();
                key.attach(port);
                VoltPort voltPort = port;
                return voltPort;
            }
            finally {
                this.m_ports.add(port);
                this.m_numPorts.incrementAndGet();
            }
        };
        FutureTask<Connection> ft = new FutureTask<Connection>(registerTask);
        this.m_tasks.offer(ft);
        this.m_selector.wakeup();
        try {
            return ft.get();
        }
        catch (Exception e) {
            throw new IOException(e);
        }
    }

    private Runnable getUnregisterRunnable(Connection c) {
        return () -> {
            VoltPort port = (VoltPort)c;
            assert (c != null);
            SelectionKey selectionKey = port.getKey();
            try {
                if (!this.m_ports.contains(port)) {
                    return;
                }
                try {
                    port.unregistering();
                }
                finally {
                    try {
                        selectionKey.attach(null);
                        selectionKey.cancel();
                    }
                    finally {
                        this.m_ports.remove(port);
                        this.m_numPorts.decrementAndGet();
                    }
                }
            }
            finally {
                port.unregistered();
            }
        };
    }

    Future<?> unregisterChannel(Connection c) {
        FutureTask<Object> ft = new FutureTask<Object>(this.getUnregisterRunnable(c), null);
        this.m_tasks.offer(ft);
        this.m_selector.wakeup();
        return ft;
    }

    @VisibleForTesting
    void addToChangeList(VoltPort port) {
        this.addToChangeList(port, false);
    }

    void addToChangeList(final VoltPort port, boolean runFirst) {
        if (runFirst) {
            this.m_tasks.offer(new Runnable(){

                @Override
                public void run() {
                    VoltNetwork.this.callPort(port);
                }
            });
        } else {
            this.m_tasks.offer(new Runnable(){

                @Override
                public void run() {
                    VoltNetwork.this.installInterests(port);
                }
            });
        }
        this.m_selector.wakeup();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        this.m_metricCollector.initialize(this.m_selector);
        ThreadLocalRandom r = ThreadLocalRandom.current();
        try {
            while (!this.m_shouldStop) {
                try {
                    while (!this.m_shouldStop) {
                        LatencyWatchdog.pet();
                        this.m_metricCollector.handOffMetrics();
                        int readyKeys = this.m_selector.select();
                        Runnable task = null;
                        while ((task = this.m_tasks.poll()) != null) {
                            task.run();
                        }
                        if (readyKeys > 0) {
                            if (NinjaKeySet.supported) {
                                this.optimizedInvokeCallbacks(r);
                            } else {
                                this.invokeCallbacks(r);
                            }
                        }
                        task = null;
                        while ((task = this.m_tasks.poll()) != null) {
                            task.run();
                        }
                    }
                }
                catch (Throwable ex) {
                    m_logger.error("Exception in network thread", ex);
                }
            }
        }
        catch (Throwable t) {
            m_logger.error("Unexpected exception in network thread", t);
        }
        finally {
            try {
                this.networkShutdown();
            }
            catch (Throwable t) {
                m_logger.error("Error shutting down network", t);
            }
        }
    }

    private void networkShutdown() {
        Set<SelectionKey> keys = this.m_selector.keys();
        for (SelectionKey key : keys) {
            VoltPort port = (VoltPort)key.attachment();
            if (port == null) continue;
            try {
                this.getUnregisterRunnable(port).run();
            }
            catch (Throwable e) {
                networkLog.error("Exception unregistering port " + port, e);
            }
        }
        this.m_metricCollector.destroy();
        this.m_pool.clear();
        try {
            this.m_selector.close();
        }
        catch (IOException e) {
            m_logger.error(null, (Throwable)e);
        }
    }

    void installInterests(VoltPort port) {
        try {
            if (port.isRunning()) {
                assert (false) : "Shouldn't be running since it is all single threaded now?";
                return;
            }
            if (port.isDead()) {
                this.getUnregisterRunnable(port).run();
                try {
                    port.m_selectionKey.channel().close();
                }
                catch (IOException iOException) {}
            } else {
                this.resumeSelection(port);
            }
        }
        catch (CancelledKeyException e) {
            networkLog.warn("Had a cancelled key exception while processing queued runnables for port " + port, (Throwable)e);
        }
    }

    private void resumeSelection(VoltPort port) {
        SelectionKey key = port.getKey();
        if (key.isValid()) {
            key.interestOps(port.interestOps());
        } else {
            this.m_ports.remove(port);
            this.m_numPorts.decrementAndGet();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void callPort(VoltPort port) {
        try {
            port.lockForHandlingWork();
            port.getKey().interestOps(0);
            port.run();
        }
        catch (CancelledKeyException e) {
            port.m_running = false;
        }
        catch (Exception e) {
            port.die();
            String address = port.getHostnameAndIPAndPort();
            if (this.normalClose(e)) {
                if (m_logger.isDebugEnabled()) {
                    m_logger.debug("Connection closed for {}", (Object)address);
                }
            } else {
                networkLog.warn("Connection closed unexpectedly for {}. {}", (Object)address, (Object)e.getMessage());
            }
        }
        finally {
            this.installInterests(port);
        }
    }

    private boolean normalClose(Exception ex) {
        if (ex instanceof IOException) {
            String error;
            String string = error = ex.getMessage() == null ? "" : ex.getMessage().trim().toLowerCase();
            if (error.equals("connection reset by peer") || error.equals("broken pipe")) {
                return true;
            }
        }
        if (ex instanceof TLSException) {
            return ((TLSException)ex).isClosed();
        }
        return ex instanceof ClosedChannelException;
    }

    protected void invokeCallbacks(ThreadLocalRandom r) {
        Set<SelectionKey> selectedKeys = this.m_selector.selectedKeys();
        int keyCount = selectedKeys.size();
        int startInx = r.nextInt(keyCount);
        this.callPortFromSelectedKeys(startInx, keyCount, selectedKeys);
        this.callPortFromSelectedKeys(0, startInx, selectedKeys);
        selectedKeys.clear();
    }

    private void callPortFromSelectedKeys(int startIdx, int keyCount, Set<SelectionKey> selectedKeys) {
        int itInx;
        Iterator<SelectionKey> it = selectedKeys.iterator();
        for (itInx = 0; itInx < startIdx; ++itInx) {
            it.next();
        }
        while (itInx < keyCount) {
            Object obj = it.next().attachment();
            if (obj == null) continue;
            VoltPort port = (VoltPort)obj;
            this.callPort(port);
            ++itInx;
        }
    }

    protected void optimizedInvokeCallbacks(ThreadLocalRandom r) {
        VoltPort port;
        Object obj;
        int ii;
        int numKeys = this.m_ninjaSelectedKeys.size();
        int startIndex = r.nextInt(numKeys);
        SelectionKey[] keys = this.m_ninjaSelectedKeys.keys();
        for (ii = startIndex; ii < numKeys; ++ii) {
            obj = keys[ii].attachment();
            if (obj == null) continue;
            port = (VoltPort)obj;
            this.callPort(port);
        }
        for (ii = 0; ii < startIndex; ++ii) {
            obj = keys[ii].attachment();
            if (obj == null) continue;
            port = (VoltPort)obj;
            this.callPort(port);
        }
        this.m_ninjaSelectedKeys.clear();
    }

    private List<IOStatsData> getIOStatsImpl(boolean collectDelta) {
        ArrayList<IOStatsData> stats = new ArrayList<IOStatsData>(this.m_ports.size());
        for (VoltPort p : this.m_ports) {
            long bytesRead = p.readStream().getBytesRead(collectDelta);
            long messagesRead = p.getMessagesRead(collectDelta);
            long[] writeInfo = p.writeStream().getBytesAndMessagesWritten(collectDelta);
            long bytesWritten = writeInfo[0];
            long messagesWritten = writeInfo[1];
            stats.add(new IOStatsData(p.connectionId(), p.getHostnameOrIP(), bytesRead, bytesWritten, messagesRead, messagesWritten));
        }
        return ImmutableList.copyOf(stats);
    }

    @Override
    public Future<List<IOStatsData>> getIOStats(boolean interval) {
        FutureTask<List<IOStatsData>> ft = new FutureTask<List<IOStatsData>>(() -> this.getIOStatsImpl(interval));
        this.m_tasks.offer(ft);
        this.m_selector.wakeup();
        return ft;
    }

    private List<TLSStatsData> getTLSStatsImpl(boolean interval) {
        ArrayList<TLSStatsData> result = new ArrayList<TLSStatsData>(this.m_ports.size());
        for (VoltPort p : this.m_ports) {
            long[] enc = p.getEncryptionStats(interval);
            long[] dec = p.getDecryptionStats(interval);
            if (enc == null || dec == null) continue;
            result.add(new TLSStatsData(p.connectionId(), p.getHostnameOrIP(), enc, dec));
        }
        return result;
    }

    @Override
    public Future<List<TLSStatsData>> getTLSStats(boolean interval) {
        FutureTask<List<TLSStatsData>> ft = new FutureTask<List<TLSStatsData>>(() -> this.getTLSStatsImpl(interval));
        this.m_tasks.offer(ft);
        this.m_selector.wakeup();
        return ft;
    }

    Long getThreadId() {
        return this.m_thread.getId();
    }

    void queueTask(Runnable r) {
        this.m_tasks.offer(r);
        this.m_selector.wakeup();
    }

    int numPorts() {
        return this.m_numPorts.get();
    }

    public Future<Set<Connection>> getConnections() {
        final SettableFuture<Set<Connection>> connectionsFuture = SettableFuture.create();
        this.queueTask(new Runnable(){

            @Override
            public void run() {
                connectionsFuture.set(new HashSet<VoltPort>(VoltNetwork.this.m_ports));
            }
        });
        return connectionsFuture;
    }

    public NetworkMetricsCollector getNetworkMetricCollector() {
        return this.m_metricCollector;
    }
}

