/*
 * Decompiled with CFR 0.152.
 */
package org.voltcore.network;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.util.ArrayDeque;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.voltcore.memory.DBBPool;
import org.voltcore.network.Connection;
import org.voltcore.network.NIOWriteStreamBase;
import org.voltcore.network.QueueMonitor;
import org.voltcore.network.WriteStream;
import org.voltcore.network.util.DeferredSerialization;
import org.voltcore.network.util.TimeProvider;

public class VoltNIOWriteStream
extends NIOWriteStreamBase
implements WriteStream {
    protected final Connection m_connection;
    protected static final Logger networkLog = LoggerFactory.getLogger((String)"NETWORK");
    private final ArrayDeque<DeferredSerialization> m_queuedWrites1 = new ArrayDeque();
    private final ArrayDeque<DeferredSerialization> m_queuedWrites2 = new ArrayDeque();
    protected ArrayDeque<DeferredSerialization> m_queuedWrites = this.m_queuedWrites1;
    protected volatile int m_maxQueuedWritesBeforeBackpressure = 100;
    private final Runnable m_offBackPressureCallback;
    private final Runnable m_onBackPressureCallback;
    protected final TimeProvider m_timeProvider;
    protected final QueueMonitor m_monitor;
    protected long m_lastPendingWriteTime = -1L;
    protected volatile boolean m_hadBackPressure = false;

    VoltNIOWriteStream(Connection port) {
        this(port, null, null, null, System::currentTimeMillis);
    }

    VoltNIOWriteStream(Connection port, Runnable offBackPressureCallback, Runnable onBackPressureCallback, QueueMonitor monitor, TimeProvider timeProvider) {
        this.m_connection = port;
        this.m_offBackPressureCallback = offBackPressureCallback;
        this.m_onBackPressureCallback = onBackPressureCallback;
        this.m_monitor = monitor;
        this.m_timeProvider = timeProvider;
    }

    @Override
    public synchronized int getOutstandingMessageCount() {
        return this.m_queuedWrites.size() + super.getOutstandingMessageCount();
    }

    @Override
    public synchronized boolean isEmpty() {
        return super.isEmpty() && this.m_queuedWrites.isEmpty();
    }

    @Override
    public boolean hadBackPressure() {
        return this.m_hadBackPressure;
    }

    protected synchronized ArrayDeque<DeferredSerialization> getQueuedWrites() {
        ArrayDeque<DeferredSerialization> oldlist;
        if (this.m_queuedWrites.isEmpty()) {
            return this.m_queuedWrites;
        }
        if (this.m_queuedWrites == this.m_queuedWrites1) {
            oldlist = this.m_queuedWrites1;
            this.m_queuedWrites = this.m_queuedWrites2;
        } else {
            oldlist = this.m_queuedWrites2;
            this.m_queuedWrites = this.m_queuedWrites1;
        }
        return oldlist;
    }

    protected final void backpressureStarted() {
        if (networkLog.isTraceEnabled()) {
            networkLog.trace("Backpressure started for client " + this.m_connection);
        }
        if (!this.m_hadBackPressure) {
            this.m_hadBackPressure = true;
            if (this.m_onBackPressureCallback != null) {
                this.m_onBackPressureCallback.run();
            }
        }
    }

    protected final void backpressureEnded() {
        if (networkLog.isTraceEnabled()) {
            networkLog.trace("Backpressure ended for client " + this.m_connection);
        }
        if (this.m_hadBackPressure) {
            this.m_hadBackPressure = false;
            if (this.m_offBackPressureCallback != null) {
                this.m_offBackPressureCallback.run();
            }
        }
    }

    protected void reportFailedToDrain() {
        if (!this.m_hadBackPressure) {
            this.backpressureStarted();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void enqueue(DeferredSerialization ds) {
        VoltNIOWriteStream voltNIOWriteStream = this;
        synchronized (voltNIOWriteStream) {
            if (this.m_isShutdown) {
                ds.cancel();
                return;
            }
            this.updateLastPendingWriteTimeAndQueueBackpressure();
            this.m_queuedWrites.offer(ds);
            this.m_connection.enableWriteSelection();
        }
    }

    @Override
    public void fastEnqueue(final DeferredSerialization ds) {
        this.m_connection.queueTask(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                VoltNIOWriteStream voltNIOWriteStream = VoltNIOWriteStream.this;
                synchronized (voltNIOWriteStream) {
                    VoltNIOWriteStream.this.updateLastPendingWriteTimeAndQueueBackpressure();
                    VoltNIOWriteStream.this.m_queuedWrites.offer(ds);
                    VoltNIOWriteStream.this.m_connection.enableWriteSelection();
                }
            }
        });
    }

    @Override
    public void enqueue(ByteBuffer b) {
        this.enqueue(new ByteBuffer[]{b});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void enqueue(final ByteBuffer[] b) {
        assert (b != null);
        for (ByteBuffer buf : b) {
            assert (!buf.isDirect());
            if (buf.remaining() != 0) continue;
            throw new IllegalArgumentException("Attempted to queue a zero length buffer");
        }
        VoltNIOWriteStream voltNIOWriteStream = this;
        synchronized (voltNIOWriteStream) {
            if (this.m_isShutdown) {
                return;
            }
            this.updateLastPendingWriteTimeAndQueueBackpressure();
            this.m_queuedWrites.offer(new DeferredSerialization(){

                @Override
                public void serialize(ByteBuffer outbuf) {
                    for (ByteBuffer buf : b) {
                        outbuf.put(buf);
                    }
                }

                @Override
                public int getSerializedSize() {
                    int sum = 0;
                    for (ByteBuffer buf : b) {
                        buf.position(0);
                        sum += buf.remaining();
                    }
                    return sum;
                }
            });
            this.m_connection.enableWriteSelection();
        }
    }

    @Override
    synchronized void shutdown() {
        super.shutdown();
        DeferredSerialization ds = null;
        while ((ds = this.m_queuedWrites.poll()) != null) {
            ds.cancel();
        }
    }

    @Override
    public synchronized int calculatePendingWriteDelta(long now) {
        if (this.m_lastPendingWriteTime == -1L) {
            return 0;
        }
        return (int)(now - this.m_lastPendingWriteTime);
    }

    private void updateLastPendingWriteTimeAndQueueBackpressure() {
        if (this.m_lastPendingWriteTime == -1L) {
            this.m_lastPendingWriteTime = this.m_timeProvider.getCurrentTime();
        }
        if (this.m_queuedWrites.size() > this.m_maxQueuedWritesBeforeBackpressure && !this.m_hadBackPressure) {
            this.backpressureStarted();
        }
    }

    @Override
    public void setPendingWriteBackpressureThreshold(int limit) {
        this.m_maxQueuedWritesBeforeBackpressure = Math.max(1, limit);
    }

    @Override
    protected void updateQueued(int queued, boolean noBackpressureSignal) {
        if (this.m_monitor != null) {
            boolean shouldSignalBackpressure = this.m_monitor.queue(queued);
            if (!noBackpressureSignal && shouldSignalBackpressure && !this.m_hadBackPressure) {
                this.backpressureStarted();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    int drainTo(GatheringByteChannel channel) throws IOException {
        int bytesWritten = 0;
        try {
            long rc = 0L;
            do {
                if (this.m_currentWriteBuffer == null && this.m_queuedBuffers.isEmpty()) {
                    int n = bytesWritten;
                    return n;
                }
                ByteBuffer buffer = null;
                if (this.m_currentWriteBuffer == null) {
                    this.m_currentWriteBuffer = (DBBPool.BBContainer)this.m_queuedBuffers.poll();
                    buffer = this.m_currentWriteBuffer.b();
                    buffer.flip();
                } else {
                    buffer = this.m_currentWriteBuffer.b();
                }
                rc = channel.write(buffer);
                if (buffer.hasRemaining()) {
                    if (!this.m_hadBackPressure) {
                        this.backpressureStarted();
                    }
                } else {
                    this.m_currentWriteBuffer.discard();
                    this.m_currentWriteBuffer = null;
                    ++this.m_messagesWritten;
                }
                bytesWritten = (int)((long)bytesWritten + rc);
            } while (rc > 0L);
        }
        finally {
            if (this.m_queuedBuffers.isEmpty() && this.m_hadBackPressure && this.m_queuedWrites.size() <= this.m_maxQueuedWritesBeforeBackpressure) {
                this.backpressureEnded();
            }
            if (!this.isEmpty()) {
                if (bytesWritten > 0) {
                    this.m_lastPendingWriteTime = this.m_timeProvider.getCurrentTime();
                }
            } else {
                this.m_lastPendingWriteTime = -1L;
            }
            if (bytesWritten > 0) {
                this.updateQueued(-bytesWritten, false);
                this.m_bytesWritten += (long)bytesWritten;
            }
        }
        return bytesWritten;
    }
}

