/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.com;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.WriteCompletionEvent;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.neo4j.com.ChannelCloser;
import org.neo4j.com.ChunkingChannelBuffer;
import org.neo4j.com.CommittedTransactionSerializer;
import org.neo4j.com.Connection;
import org.neo4j.com.DechunkingChannelBuffer;
import org.neo4j.com.IdleChannelReaper;
import org.neo4j.com.IllegalProtocolVersionException;
import org.neo4j.com.MonitorChannelHandler;
import org.neo4j.com.NetworkFlushableChannel;
import org.neo4j.com.PortRangeSocketBinder;
import org.neo4j.com.Protocol;
import org.neo4j.com.ProtocolVersion;
import org.neo4j.com.RequestContext;
import org.neo4j.com.RequestType;
import org.neo4j.com.Response;
import org.neo4j.com.TxChecksumVerifier;
import org.neo4j.com.monitor.RequestMonitor;
import org.neo4j.helpers.Clock;
import org.neo4j.helpers.Exceptions;
import org.neo4j.helpers.HostnamePort;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.helpers.collection.Visitor;
import org.neo4j.kernel.impl.store.StoreId;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.kernel.monitoring.ByteCounterMonitor;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

public abstract class Server<T, R>
extends SimpleChannelHandler
implements ChannelPipelineFactory,
Lifecycle,
ChannelCloser {
    private final LogProvider logProvider;
    private ScheduledExecutorService silentChannelExecutor;
    public static final int DEFAULT_MAX_NUMBER_OF_CONCURRENT_TRANSACTIONS = 200;
    static final byte INTERNAL_PROTOCOL_VERSION = 2;
    private final T requestTarget;
    private final IdleChannelReaper connectedSlaveChannels;
    private final Log msgLog;
    private final Map<Channel, PartialRequest> partialRequests = new ConcurrentHashMap<Channel, PartialRequest>();
    private final Configuration config;
    private final int frameLength;
    private final ByteCounterMonitor byteCounterMonitor;
    private final RequestMonitor requestMonitor;
    private final byte applicationProtocolVersion;
    private final TxChecksumVerifier txVerifier;
    private ServerBootstrap bootstrap;
    private ChannelGroup channelGroup;
    private ExecutorService targetCallExecutor;
    private volatile boolean shuttingDown;
    private InetSocketAddress socketAddress;
    private ExecutorService unfinishedTransactionExecutor;
    private int chunkSize;

    public Server(T requestTarget, Configuration config, LogProvider logProvider, int frameLength, ProtocolVersion protocolVersion, TxChecksumVerifier txVerifier, Clock clock, ByteCounterMonitor byteCounterMonitor, RequestMonitor requestMonitor) {
        this.requestTarget = requestTarget;
        this.config = config;
        this.frameLength = frameLength;
        this.applicationProtocolVersion = protocolVersion.getApplicationProtocol();
        this.logProvider = logProvider;
        this.msgLog = this.logProvider.getLog(this.getClass());
        this.txVerifier = txVerifier;
        this.byteCounterMonitor = byteCounterMonitor;
        this.requestMonitor = requestMonitor;
        this.connectedSlaveChannels = new IdleChannelReaper(this, logProvider, clock, config.getOldChannelThreshold());
        this.chunkSize = config.getChunkSize();
        Protocol.assertChunkSizeIsWithinFrameSize(this.chunkSize, frameLength);
    }

    private static void writeStoreId(StoreId storeId, ChannelBuffer targetBuffer) {
        targetBuffer.writeLong(storeId.getCreationTime());
        targetBuffer.writeLong(storeId.getRandomId());
        targetBuffer.writeLong(storeId.getStoreVersion());
        targetBuffer.writeLong(storeId.getUpgradeTime());
        targetBuffer.writeLong(storeId.getUpgradeId());
    }

    public void init() throws Throwable {
        this.chunkSize = this.config.getChunkSize();
        Protocol.assertChunkSizeIsWithinFrameSize(this.chunkSize, this.frameLength);
        String className = this.getClass().getSimpleName();
        this.targetCallExecutor = Executors.newCachedThreadPool((ThreadFactory)NamedThreadFactory.named((String)(className + ":" + this.config.getServerAddress().getPort())));
        this.unfinishedTransactionExecutor = Executors.newScheduledThreadPool(2, (ThreadFactory)NamedThreadFactory.named((String)"Unfinished transactions"));
        this.silentChannelExecutor = Executors.newSingleThreadScheduledExecutor((ThreadFactory)NamedThreadFactory.named((String)"Silent channel reaper"));
        this.silentChannelExecutor.scheduleWithFixedDelay(this.connectedSlaveChannels, 5L, 5L, TimeUnit.SECONDS);
    }

    public void start() throws Throwable {
        String className = this.getClass().getSimpleName();
        ExecutorService bossExecutor = Executors.newCachedThreadPool((ThreadFactory)NamedThreadFactory.daemon((String)("Boss-" + className)));
        ExecutorService workerExecutor = Executors.newCachedThreadPool((ThreadFactory)NamedThreadFactory.daemon((String)("Worker-" + className)));
        this.bootstrap = new ServerBootstrap((ChannelFactory)new NioServerSocketChannelFactory((Executor)bossExecutor, (Executor)workerExecutor, this.config.getMaxConcurrentTransactions()));
        this.bootstrap.setPipelineFactory((ChannelPipelineFactory)this);
        PortRangeSocketBinder portRangeSocketBinder = new PortRangeSocketBinder(this.bootstrap);
        try {
            Connection connection = portRangeSocketBinder.bindToFirstAvailablePortInRange(this.config.getServerAddress());
            Channel channel = connection.getChannel();
            this.socketAddress = connection.getSocketAddress();
            this.channelGroup = new DefaultChannelGroup();
            this.channelGroup.add((Object)channel);
            this.msgLog.info(className + " communication server started and bound to " + this.socketAddress);
        }
        catch (Exception ex) {
            this.msgLog.error("Failed to bind server to " + this.socketAddress, (Throwable)ex);
            this.bootstrap.releaseExternalResources();
            this.targetCallExecutor.shutdownNow();
            this.unfinishedTransactionExecutor.shutdownNow();
            this.silentChannelExecutor.shutdownNow();
            throw new IOException(ex);
        }
    }

    public void stop() throws Throwable {
        String name = this.getClass().getSimpleName();
        this.msgLog.info(name + " communication server shutting down and unbinding from  " + this.socketAddress);
        this.shuttingDown = true;
        this.channelGroup.close().awaitUninterruptibly();
        this.bootstrap.releaseExternalResources();
    }

    public void shutdown() throws Throwable {
        this.targetCallExecutor.shutdown();
        this.targetCallExecutor.awaitTermination(10L, TimeUnit.SECONDS);
        this.unfinishedTransactionExecutor.shutdown();
        this.unfinishedTransactionExecutor.awaitTermination(10L, TimeUnit.SECONDS);
        this.silentChannelExecutor.shutdown();
        this.silentChannelExecutor.awaitTermination(10L, TimeUnit.SECONDS);
    }

    public InetSocketAddress getSocketAddress() {
        return this.socketAddress;
    }

    protected byte getInternalProtocolVersion() {
        return 2;
    }

    public ChannelPipeline getPipeline() throws Exception {
        ChannelPipeline pipeline = Channels.pipeline();
        pipeline.addLast("monitor", (ChannelHandler)new MonitorChannelHandler(this.byteCounterMonitor));
        Protocol.addLengthFieldPipes(pipeline, this.frameLength);
        pipeline.addLast("serverHandler", (ChannelHandler)this);
        return pipeline;
    }

    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        this.channelGroup.add((Object)e.getChannel());
    }

    public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
        try {
            ChannelBuffer message = (ChannelBuffer)event.getMessage();
            this.handleRequest(message, event.getChannel());
        }
        catch (Throwable e) {
            this.msgLog.error("Error handling request", e);
            ChunkingChannelBuffer buffer = this.newChunkingBuffer(event.getChannel());
            buffer.clear(true);
            this.writeFailureResponse(e, buffer);
            ctx.getChannel().close();
            this.tryToCloseChannel(ctx.getChannel());
            throw Exceptions.launderedException((Throwable)e);
        }
    }

    public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception {
        if (this.connectedSlaveChannels.update(ctx.getChannel())) {
            super.writeComplete(ctx, e);
        }
    }

    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        super.channelClosed(ctx, e);
        if (!ctx.getChannel().isOpen()) {
            this.tryToCloseChannel(ctx.getChannel());
        }
        this.channelGroup.remove((Object)e.getChannel());
    }

    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        super.channelDisconnected(ctx, e);
        if (!ctx.getChannel().isConnected()) {
            this.tryToCloseChannel(ctx.getChannel());
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        this.msgLog.warn("Exception from Netty", e.getCause());
    }

    @Override
    public void tryToCloseChannel(Channel channel) {
        IdleChannelReaper.Request request = this.unmapSlave(channel);
        if (request == null) {
            return;
        }
        this.tryToFinishOffChannel(channel, request.getRequestContext());
    }

    protected void tryToFinishOffChannel(Channel channel, RequestContext slave) {
        try {
            this.stopConversation(slave);
            this.unmapSlave(channel);
        }
        catch (Throwable failure) {
            this.submitSilent(this.unfinishedTransactionExecutor, this.newTransactionFinisher(slave));
            this.msgLog.warn("Could not finish off dead channel", failure);
        }
    }

    private void submitSilent(ExecutorService service, Runnable job) {
        block2: {
            try {
                service.submit(job);
            }
            catch (RejectedExecutionException e) {
                if (this.shuttingDown) break block2;
                throw e;
            }
        }
    }

    private Runnable newTransactionFinisher(final RequestContext slave) {
        return new Runnable(){

            @Override
            public void run() {
                try {
                    Server.this.stopConversation(slave);
                }
                catch (Throwable e) {
                    this.sleepNicely(200);
                    Server.this.unfinishedTransactionExecutor.submit(this);
                }
            }

            private void sleepNicely(int millis) {
                try {
                    Thread.sleep(millis);
                }
                catch (InterruptedException e) {
                    Thread.interrupted();
                }
            }
        };
    }

    protected void handleRequest(ChannelBuffer buffer, Channel channel) {
        Byte continuation = this.readContinuationHeader(buffer, channel);
        if (continuation == null) {
            return;
        }
        if (continuation == 1) {
            PartialRequest partialRequest = this.partialRequests.get(channel);
            if (partialRequest == null) {
                RequestType<T> type = this.getRequestContext(buffer.readByte());
                RequestContext context = this.readContext(buffer);
                ChannelBuffer targetBuffer = this.mapSlave(channel, context);
                partialRequest = new PartialRequest(type, context, targetBuffer);
                this.partialRequests.put(channel, partialRequest);
            }
            partialRequest.add(buffer);
        } else {
            ChannelBuffer bufferToWriteTo;
            ChannelBuffer bufferToReadFrom;
            RequestContext context;
            RequestType type;
            PartialRequest partialRequest = this.partialRequests.remove(channel);
            if (partialRequest == null) {
                type = this.getRequestContext(buffer.readByte());
                context = this.readContext(buffer);
                ChannelBuffer targetBuffer = this.mapSlave(channel, context);
                bufferToReadFrom = buffer;
                bufferToWriteTo = targetBuffer;
            } else {
                type = partialRequest.type;
                context = partialRequest.context;
                ChannelBuffer targetBuffer = partialRequest.buffer;
                partialRequest.add(buffer);
                bufferToReadFrom = targetBuffer;
                bufferToWriteTo = ChannelBuffers.dynamicBuffer();
            }
            bufferToWriteTo.clear();
            ChunkingChannelBuffer chunkingBuffer = this.newChunkingBuffer(bufferToWriteTo, channel, this.chunkSize, this.getInternalProtocolVersion(), this.applicationProtocolVersion);
            this.submitSilent(this.targetCallExecutor, new TargetCaller(type, channel, context, chunkingBuffer, bufferToReadFrom));
        }
    }

    private Byte readContinuationHeader(ChannelBuffer buffer, final Channel channel) {
        byte[] header = new byte[2];
        buffer.readBytes(header);
        try {
            DechunkingChannelBuffer.assertSameProtocolVersion(header, this.getInternalProtocolVersion(), this.applicationProtocolVersion);
        }
        catch (IllegalProtocolVersionException e) {
            this.submitSilent(this.targetCallExecutor, new Runnable(){

                @Override
                public void run() {
                    Server.this.writeFailureResponse(e, Server.this.newChunkingBuffer(channel));
                }
            });
            return null;
        }
        return (byte)(header[0] & 1);
    }

    protected void writeFailureResponse(Throwable exception, ChunkingChannelBuffer buffer) {
        try {
            ByteArrayOutputStream bytes = new ByteArrayOutputStream();
            ObjectOutputStream out = new ObjectOutputStream(bytes);
            out.writeObject(exception);
            out.close();
            buffer.writeBytes(bytes.toByteArray());
            buffer.done();
        }
        catch (IOException e) {
            this.msgLog.warn("Couldn't send cause of error to client", exception);
        }
    }

    protected void responseWritten(RequestType<T> type, Channel channel, RequestContext context) {
    }

    protected RequestContext readContext(ChannelBuffer buffer) {
        long sessionId = buffer.readLong();
        int machineId = buffer.readInt();
        int eventIdentifier = buffer.readInt();
        long neoTx = buffer.readLong();
        long checksum = buffer.readLong();
        RequestContext readRequestContext = new RequestContext(sessionId, machineId, eventIdentifier, neoTx, checksum);
        if (neoTx > 1L) {
            this.txVerifier.assertMatch(neoTx, checksum);
        }
        return readRequestContext;
    }

    protected abstract RequestType<T> getRequestContext(byte var1);

    protected ChannelBuffer mapSlave(Channel channel, RequestContext slave) {
        if (slave != null && slave.machineId() != RequestContext.EMPTY.machineId()) {
            this.connectedSlaveChannels.add(channel, slave);
        }
        return ChannelBuffers.dynamicBuffer();
    }

    protected IdleChannelReaper.Request unmapSlave(Channel channel) {
        return this.connectedSlaveChannels.remove(channel);
    }

    protected T getRequestTarget() {
        return this.requestTarget;
    }

    protected abstract void stopConversation(RequestContext var1);

    private ChunkingChannelBuffer newChunkingBuffer(Channel channel) {
        return this.newChunkingBuffer(ChannelBuffers.dynamicBuffer(), channel, this.chunkSize, this.getInternalProtocolVersion(), this.applicationProtocolVersion);
    }

    protected ChunkingChannelBuffer newChunkingBuffer(ChannelBuffer bufferToWriteTo, Channel channel, int capacity, byte internalProtocolVersion, byte applicationProtocolVersion) {
        return new ChunkingChannelBuffer(bufferToWriteTo, channel, capacity, internalProtocolVersion, applicationProtocolVersion);
    }

    private class PartialRequest {
        final RequestContext context;
        final ChannelBuffer buffer;
        final RequestType<T> type;

        public PartialRequest(RequestType<T> type, RequestContext context, ChannelBuffer buffer) {
            this.type = type;
            this.context = context;
            this.buffer = buffer;
        }

        public void add(ChannelBuffer buffer) {
            this.buffer.writeBytes(buffer);
        }
    }

    private class TargetCaller
    implements Response.Handler,
    Runnable {
        private final RequestType<T> type;
        private final Channel channel;
        private final RequestContext context;
        private final ChunkingChannelBuffer targetBuffer;
        private final ChannelBuffer bufferToReadFrom;

        TargetCaller(RequestType<T> type, Channel channel, RequestContext context, ChunkingChannelBuffer targetBuffer, ChannelBuffer bufferToReadFrom) {
            this.type = type;
            this.channel = channel;
            this.context = context;
            this.targetBuffer = targetBuffer;
            this.bufferToReadFrom = bufferToReadFrom;
        }

        @Override
        public void run() {
            Server.this.requestMonitor.beginRequest(this.channel.getRemoteAddress(), this.type, this.context);
            Response response = null;
            Throwable failure = null;
            try {
                Server.this.unmapSlave(this.channel);
                response = this.type.getTargetCaller().call(Server.this.requestTarget, this.context, this.bufferToReadFrom, this.targetBuffer);
                this.type.getObjectSerializer().write(response.response(), this.targetBuffer);
                Server.writeStoreId(response.getStoreId(), this.targetBuffer);
                response.accept(this);
                this.targetBuffer.done();
                Server.this.responseWritten(this.type, this.channel, this.context);
            }
            catch (Throwable e) {
                failure = e;
                this.targetBuffer.clear(true);
                Server.this.writeFailureResponse(e, this.targetBuffer);
                Server.this.tryToFinishOffChannel(this.channel, this.context);
                throw Exceptions.launderedException((Throwable)e);
            }
            finally {
                if (response != null) {
                    response.close();
                }
                Server.this.requestMonitor.endRequest(failure);
            }
        }

        @Override
        public void obligation(long txId) throws IOException {
            this.targetBuffer.writeByte(-1);
            this.targetBuffer.writeLong(txId);
        }

        @Override
        public Visitor<CommittedTransactionRepresentation, Exception> transactions() {
            this.targetBuffer.writeByte(1);
            return new CommittedTransactionSerializer(new NetworkFlushableChannel(this.targetBuffer));
        }
    }

    public static interface Configuration {
        public long getOldChannelThreshold();

        public int getMaxConcurrentTransactions();

        public int getChunkSize();

        public HostnamePort getServerAddress();
    }
}

