/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.server.protocol.v0_8;

import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.security.AccessControlException;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import javax.security.auth.Subject;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.QpidException;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.codec.ServerDecoder;
import org.apache.qpid.configuration.CommonProperties;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQFrameDecodingException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.ChannelOpenOkBody;
import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.ConnectionCloseOkBody;
import org.apache.qpid.framing.ConnectionOpenOkBody;
import org.apache.qpid.framing.ConnectionRedirectBody;
import org.apache.qpid.framing.ConnectionSecureBody;
import org.apache.qpid.framing.ConnectionStartBody;
import org.apache.qpid.framing.ConnectionTuneBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.HeartbeatBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.framing.ServerChannelMethodProcessor;
import org.apache.qpid.framing.ServerMethodDispatcher;
import org.apache.qpid.framing.ServerMethodProcessor;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.ConnectionClosingTicker;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.BrokerDecoder;
import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod;
import org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverterImpl;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.transport.AbstractAMQPConnection;
import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.transport.ServerNetworkConnection;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.AggregateTicker;
import org.apache.qpid.transport.network.Ticker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQPConnection_0_8
extends AbstractAMQPConnection<AMQPConnection_0_8>
implements ServerMethodProcessor<ServerChannelMethodProcessor> {
    private static final Logger _logger = LoggerFactory.getLogger(AMQPConnection_0_8.class);
    private static final String BROKER_DEBUG_BINARY_DATA_LENGTH = "broker.debug.binaryDataLength";
    private static final int DEFAULT_DEBUG_BINARY_DATA_LENGTH = 80;
    private static final long CLOSE_OK_TIMEOUT = 10000L;
    private final AtomicBoolean _stateChanged = new AtomicBoolean();
    private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference();
    private final Object _channelAddRemoveLock = new Object();
    private final Map<Integer, AMQChannel> _channelMap = new ConcurrentHashMap<Integer, AMQChannel>();
    private volatile ConnectionState _state = ConnectionState.INIT;
    private final Set<AMQChannel> _channelsForCurrentMessage = Collections.newSetFromMap(new ConcurrentHashMap());
    private final ServerDecoder _decoder;
    private volatile SaslServer _saslServer;
    private volatile long _maxNoOfChannels;
    private volatile ProtocolVersion _protocolVersion;
    private volatile MethodRegistry _methodRegistry;
    private final Queue<Action<? super AMQPConnection_0_8>> _asyncTaskList = new ConcurrentLinkedQueue<Action<? super AMQPConnection_0_8>>();
    private final Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer, Long>();
    private volatile ProtocolOutputConverter _protocolOutputConverter;
    private final Object _reference = new Object();
    private volatile int _maxFrameSize;
    private final AtomicBoolean _orderlyClose = new AtomicBoolean(false);
    private final ServerNetworkConnection _network;
    private final ByteBufferSender _sender;
    private volatile boolean _deferFlush;
    private boolean _blocking;
    private volatile boolean _closeWhenNoRoute;
    private volatile boolean _compressionSupported;
    private volatile int _messageCompressionThreshold;
    private volatile boolean _sendQueueDeleteOkRegardless;
    private final Pattern _sendQueueDeleteOkRegardlessClientVerRegexp;
    private volatile int _currentClassId;
    private volatile int _currentMethodId;
    private final int _binaryDataLimit;
    private final long _maxMessageSize;
    private volatile boolean _transportBlockedForWriting;
    private volatile SubjectAuthenticationResult _successfulAuthenticationResult;

    public AMQPConnection_0_8(Broker<?> broker, ServerNetworkConnection network, AmqpPort<?> port, Transport transport, Protocol protocol, long connectionId, AggregateTicker aggregateTicker) {
        super(broker, network, port, transport, protocol, connectionId, aggregateTicker);
        this._maxNoOfChannels = broker.getConnection_sessionCountLimit();
        this._decoder = new BrokerDecoder(this);
        this._binaryDataLimit = this.getBroker().getContextKeys(false).contains(BROKER_DEBUG_BINARY_DATA_LENGTH) ? (Integer)this.getBroker().getContextValue(Integer.class, BROKER_DEBUG_BINARY_DATA_LENGTH) : 80;
        String sendQueueDeleteOkRegardlessRegexp = this.getBroker().getContextKeys(false).contains("connection.sendQueueDeleteOkRegardlessClientVerRegexp") ? (String)this.getBroker().getContextValue(String.class, "connection.sendQueueDeleteOkRegardlessClientVerRegexp") : "";
        this._sendQueueDeleteOkRegardlessClientVerRegexp = Pattern.compile(sendQueueDeleteOkRegardlessRegexp);
        int maxMessageSize = (Integer)port.getContextValue(Integer.class, "qpid.port.max_message_size");
        this._maxMessageSize = maxMessageSize > 0 ? (long)maxMessageSize : Long.MAX_VALUE;
        this._network = network;
        this._sender = network.getSender();
        this._closeWhenNoRoute = this.getBroker().getConnection_closeWhenNoRoute();
        this.logConnectionOpen();
    }

    public boolean isTransportBlockedForWriting() {
        return this._transportBlockedForWriting;
    }

    public void setTransportBlockedForWriting(boolean blocked) {
        if (this._transportBlockedForWriting != blocked) {
            this._transportBlockedForWriting = blocked;
            for (AMQChannel channel : this._channelMap.values()) {
                channel.transportStateChanged();
            }
        }
    }

    public void setMaxFrameSize(int frameMax) {
        this._maxFrameSize = frameMax;
        this._decoder.setMaxFrameSize(frameMax);
    }

    public long getMaxFrameSize() {
        return this._maxFrameSize;
    }

    private int getDefaultMaxFrameSize() {
        Broker broker = this.getBroker();
        return broker.getNetworkBufferSize() - AMQFrame.getFrameOverhead();
    }

    public boolean isClosing() {
        return this._orderlyClose.get();
    }

    public ClientDeliveryMethod createDeliveryMethod(int channelId) {
        return new WriteDeliverMethod(channelId);
    }

    public void received(final QpidByteBuffer msg) {
        AccessController.doPrivileged(new PrivilegedAction<Void>(){

            @Override
            public Void run() {
                AMQPConnection_0_8.this.updateLastReadTime();
                try {
                    AMQPConnection_0_8.this._decoder.decodeBuffer(msg);
                    AMQPConnection_0_8.this.receivedCompleteAllChannels();
                }
                catch (IOException | AMQFrameDecodingException e) {
                    _logger.error("Unexpected exception", e);
                    throw new ConnectionScopedRuntimeException(e);
                }
                catch (StoreException e) {
                    if (AMQPConnection_0_8.this.getVirtualHost().getState() == State.ACTIVE) {
                        throw new ServerScopedRuntimeException((Throwable)e);
                    }
                    throw new ConnectionScopedRuntimeException((Throwable)e);
                }
                return null;
            }
        }, this.getAccessControllerContext());
    }

    private void receivedCompleteAllChannels() {
        RuntimeException exception = null;
        for (AMQChannel channel : this._channelsForCurrentMessage) {
            try {
                channel.receivedComplete();
            }
            catch (RuntimeException exceptionForThisChannel) {
                if (exception == null) {
                    exception = exceptionForThisChannel;
                }
                _logger.error("Error informing channel that receiving is complete. Channel: " + channel, (Throwable)exceptionForThisChannel);
            }
        }
        this._channelsForCurrentMessage.clear();
        if (exception != null) {
            throw exception;
        }
    }

    void channelRequiresSync(AMQChannel amqChannel) {
        this._channelsForCurrentMessage.add(amqChannel);
    }

    private synchronized void protocolInitiationReceived(ProtocolInitiation pi) {
        this._decoder.setExpectProtocolInitiation(false);
        try {
            ProtocolVersion pv = pi.checkVersion();
            this.setProtocolVersion(pv);
            StringBuilder mechanismBuilder = new StringBuilder();
            SubjectCreator subjectCreator = this.getPort().getAuthenticationProvider().getSubjectCreator(this.getTransport().isSecure());
            for (String mechanismName : subjectCreator.getMechanisms()) {
                if (mechanismBuilder.length() != 0) {
                    mechanismBuilder.append(' ');
                }
                mechanismBuilder.append(mechanismName);
            }
            String mechanisms = mechanismBuilder.toString();
            String locales = "en_US";
            FieldTable serverProperties = FieldTableFactory.newFieldTable();
            serverProperties.setString("product", CommonProperties.getProductName());
            serverProperties.setString("version", CommonProperties.getReleaseVersion());
            serverProperties.setString("qpid.build", CommonProperties.getBuildVersion());
            serverProperties.setString("qpid.instance_name", this.getBroker().getName());
            serverProperties.setString("qpid.close_when_no_route", String.valueOf(this._closeWhenNoRoute));
            serverProperties.setString("qpid.message_compression_supported", String.valueOf(this.getBroker().isMessageCompressionEnabled()));
            serverProperties.setString("qpid.confirmed_publish_supported", Boolean.TRUE.toString());
            serverProperties.setString("qpid.virtualhost_properties_supported", String.valueOf(this.getBroker().isVirtualHostPropertiesNodeEnabled()));
            ConnectionStartBody responseBody = this.getMethodRegistry().createConnectionStartBody((short)this.getProtocolMajorVersion(), (short)pv.getActualMinorVersion(), serverProperties, mechanisms.getBytes(), locales.getBytes());
            this.writeFrame((AMQDataBlock)responseBody.generateFrame(0));
            this._state = ConnectionState.AWAIT_START_OK;
            this._sender.flush();
        }
        catch (QpidException e) {
            _logger.debug("Received unsupported protocol initiation for protocol version: {} ", (Object)this.getProtocolVersion());
            this.writeFrame((AMQDataBlock)new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
            this._sender.flush();
        }
    }

    public synchronized void writeFrame(AMQDataBlock frame) {
        if (_logger.isDebugEnabled()) {
            _logger.debug("SEND: " + frame);
        }
        try {
            frame.writePayload(this._sender);
        }
        catch (IOException e) {
            throw new ServerScopedRuntimeException((Throwable)e);
        }
        this.updateLastWriteTime();
        if (!this._deferFlush) {
            this._sender.flush();
        }
    }

    public AMQChannel getChannel(int channelId) {
        AMQChannel channel = this._channelMap.get(channelId);
        if (channel == null || channel.isClosing()) {
            return null;
        }
        return channel;
    }

    public boolean channelAwaitingClosure(int channelId) {
        return !this._closingChannelsList.isEmpty() && this._closingChannelsList.containsKey(channelId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addChannel(AMQChannel channel) {
        Object object = this._channelAddRemoveLock;
        synchronized (object) {
            this._channelMap.put(channel.getChannelId(), channel);
            this.sessionAdded(channel);
            if (this._blocking) {
                channel.block();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeChannel(int channelId) {
        AMQChannel session;
        Object object = this._channelAddRemoveLock;
        synchronized (object) {
            session = this._channelMap.remove(channelId);
        }
        this.sessionRemoved(session);
    }

    public long getMaximumNumberOfChannels() {
        return this._maxNoOfChannels;
    }

    private void setMaximumNumberOfChannels(Long value) {
        this._maxNoOfChannels = value;
    }

    void closeChannel(AMQChannel channel) {
        this.closeChannel(channel, null, null, false);
    }

    public void closeChannelAndWriteFrame(AMQChannel channel, AMQConstant cause, String message) {
        this.writeFrame((AMQDataBlock)new AMQFrame(channel.getChannelId(), (AMQBody)this.getMethodRegistry().createChannelCloseBody(cause.getCode(), AMQShortString.validValueOf((Object)message), this._currentClassId, this._currentMethodId)));
        this.closeChannel(channel, cause, message, true);
    }

    public void closeChannel(int channelId, AMQConstant cause, String message) {
        AMQChannel channel = this.getChannel(channelId);
        if (channel == null) {
            throw new IllegalArgumentException("Unknown channel id");
        }
        this.closeChannel(channel, cause, message, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void closeChannel(AMQChannel channel, AMQConstant cause, String message, boolean mark) {
        int channelId = channel.getChannelId();
        try {
            channel.close(cause, message);
            if (mark) {
                this.markChannelAwaitingCloseOk(channelId);
            }
        }
        finally {
            this.removeChannel(channelId);
        }
    }

    public void closeChannelOk(int channelId) {
        this._closingChannelsList.remove(channelId);
    }

    private void markChannelAwaitingCloseOk(int channelId) {
        this._closingChannelsList.put(channelId, System.currentTimeMillis());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeAllChannels() {
        try {
            RuntimeException firstException = null;
            for (AMQChannel channel : this.getSessionModels()) {
                try {
                    channel.close();
                }
                catch (RuntimeException re) {
                    if (!(re instanceof ConnectionScopedRuntimeException)) {
                        _logger.error("Unexpected exception closing channel", (Throwable)re);
                    }
                    firstException = re;
                }
            }
            if (firstException != null) {
                throw firstException;
            }
        }
        finally {
            Object object = this._channelAddRemoveLock;
            synchronized (object) {
                this._channelMap.clear();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void completeAndCloseAllChannels() {
        try {
            this.receivedCompleteAllChannels();
        }
        finally {
            this.closeAllChannels();
        }
    }

    void sendConnectionClose(AMQConstant errorCode, String message, int channelId) {
        this.sendConnectionClose(channelId, new AMQFrame(0, (AMQBody)new ConnectionCloseBody(this.getProtocolVersion(), errorCode.getCode(), AMQShortString.validValueOf((Object)message), this._currentClassId, this._currentMethodId)));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendConnectionClose(int channelId, AMQFrame frame) {
        if (this._orderlyClose.compareAndSet(false, true)) {
            try {
                this.markChannelAwaitingCloseOk(channelId);
                this.completeAndCloseAllChannels();
            }
            finally {
                try {
                    this.writeFrame((AMQDataBlock)frame);
                }
                finally {
                    long timeoutTime = System.currentTimeMillis() + 10000L;
                    this.getAggregateTicker().addTicker((Ticker)new ConnectionClosingTicker(timeoutTime, this._network));
                    this.notifyWork();
                }
            }
        }
    }

    public void closeNetworkConnection() {
        this._network.close();
    }

    public String toString() {
        return this._network.getRemoteAddress() + "(" + (this.getAuthorizedPrincipal() == null ? "?" : this.getAuthorizedPrincipal().getName()) + ")";
    }

    private String getLocalFQDN() {
        SocketAddress address = this._network.getLocalAddress();
        if (address instanceof InetSocketAddress) {
            return ((InetSocketAddress)address).getHostName();
        }
        throw new IllegalArgumentException("Unsupported socket address class: " + address);
    }

    private SaslServer getSaslServer() {
        return this._saslServer;
    }

    private void setSaslServer(SaslServer saslServer) {
        this._saslServer = saslServer;
    }

    public boolean isSendQueueDeleteOkRegardless() {
        return this._sendQueueDeleteOkRegardless;
    }

    void setSendQueueDeleteOkRegardless(boolean sendQueueDeleteOkRegardless) {
        this._sendQueueDeleteOkRegardless = sendQueueDeleteOkRegardless;
    }

    private void setClientProperties(FieldTable clientProperties) {
        if (clientProperties != null) {
            String compressionSupported;
            String closeWhenNoRoute = clientProperties.getString("qpid.close_when_no_route");
            if (closeWhenNoRoute != null) {
                this._closeWhenNoRoute = Boolean.parseBoolean(closeWhenNoRoute);
                _logger.debug("Client set closeWhenNoRoute={} for connection {}", (Object)this._closeWhenNoRoute, (Object)this);
            }
            if ((compressionSupported = clientProperties.getString("qpid.message_compression_supported")) != null) {
                this._compressionSupported = Boolean.parseBoolean(compressionSupported);
                _logger.debug("Client set compressionSupported={} for connection {}", (Object)this._compressionSupported, (Object)this);
            }
            String clientId = clientProperties.getString("instance");
            String clientVersion = clientProperties.getString("version");
            String clientProduct = clientProperties.getString("product");
            String remoteProcessPid = clientProperties.getString("qpid.client_pid");
            boolean mightBeQpidClient = clientProduct != null && (clientProduct.toLowerCase().contains("qpid") || clientProduct.toLowerCase().equals("unknown"));
            boolean sendQueueDeleteOkRegardless = mightBeQpidClient && (clientVersion == null || this._sendQueueDeleteOkRegardlessClientVerRegexp.matcher(clientVersion).matches());
            this.setSendQueueDeleteOkRegardless(sendQueueDeleteOkRegardless);
            if (sendQueueDeleteOkRegardless) {
                _logger.debug("Peer is an older Qpid client, queue delete-ok response will be sent regardless for connection {}", (Object)this);
            }
            this.setClientVersion(clientVersion);
            this.setClientProduct(clientProduct);
            this.setRemoteProcessPid(remoteProcessPid);
            this.setClientId(clientId == null ? UUID.randomUUID().toString() : clientId);
        }
    }

    private void setProtocolVersion(ProtocolVersion pv) {
        this._protocolVersion = pv;
        this._methodRegistry = new MethodRegistry(this._protocolVersion);
        this._protocolOutputConverter = new ProtocolOutputConverterImpl(this);
    }

    public byte getProtocolMajorVersion() {
        return this._protocolVersion.getMajorVersion();
    }

    public ProtocolVersion getProtocolVersion() {
        return this._protocolVersion;
    }

    public byte getProtocolMinorVersion() {
        return this._protocolVersion.getMinorVersion();
    }

    public MethodRegistry getRegistry() {
        return this.getMethodRegistry();
    }

    public void setVirtualHost(VirtualHostImpl<?, ?, ?> virtualHost) {
        this.associateVirtualHost((VirtualHost)virtualHost);
        this._messageCompressionThreshold = (Integer)virtualHost.getContextValue(Integer.class, "connection.messageCompressionThresholdSize");
        if (this._messageCompressionThreshold <= 0) {
            this._messageCompressionThreshold = Integer.MAX_VALUE;
        }
        this.getSubject().getPrincipals().add(virtualHost.getPrincipal());
        this.updateAccessControllerContext();
        this.logConnectionOpen();
    }

    public ProtocolOutputConverter getProtocolOutputConverter() {
        return this._protocolOutputConverter;
    }

    public void setAuthorizedSubject(Subject authorizedSubject) {
        if (authorizedSubject == null) {
            throw new IllegalArgumentException("authorizedSubject cannot be null");
        }
        this.getSubject().getPrincipals().addAll(authorizedSubject.getPrincipals());
        this.getSubject().getPrivateCredentials().addAll(authorizedSubject.getPrivateCredentials());
        this.getSubject().getPublicCredentials().addAll(authorizedSubject.getPublicCredentials());
        this.updateAccessControllerContext();
    }

    public Subject getAuthorizedSubject() {
        return this.getSubject();
    }

    public Principal getAuthorizedPrincipal() {
        return this.getSubject().getPrincipals(AuthenticatedPrincipal.class).size() == 0 ? null : AuthenticatedPrincipal.getAuthenticatedPrincipalFromSubject((Subject)this.getSubject());
    }

    public Principal getPeerPrincipal() {
        return this._network.getPeerPrincipal();
    }

    public MethodRegistry getMethodRegistry() {
        return this._methodRegistry;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void closed() {
        try {
            try {
                if (!this._orderlyClose.get()) {
                    this.completeAndCloseAllChannels();
                }
            }
            finally {
                this.performDeleteTasks();
                VirtualHost virtualHost = this.getVirtualHost();
                if (virtualHost != null) {
                    virtualHost.deregisterConnection((AMQPConnection)this);
                }
            }
        }
        catch (ConnectionScopedRuntimeException | TransportException e) {
            try {
                _logger.error("Could not close protocol engine", e);
            }
            catch (Throwable throwable) {
                this.markTransportClosed();
                this.runAsSubject(new PrivilegedAction<Void>(){

                    @Override
                    public Void run() {
                        AMQPConnection_0_8.this.getEventLogger().message(AMQPConnection_0_8.this._orderlyClose.get() ? ConnectionMessages.CLOSE() : ConnectionMessages.DROPPED_CONNECTION());
                        return null;
                    }
                });
                throw throwable;
            }
            this.markTransportClosed();
            this.runAsSubject(new /* invalid duplicate definition of identical inner class */);
        }
        this.markTransportClosed();
        this.runAsSubject(new /* invalid duplicate definition of identical inner class */);
    }

    public void encryptedTransport() {
    }

    public void readerIdle() {
        AccessController.doPrivileged(new PrivilegedAction<Object>(){

            @Override
            public Object run() {
                AMQPConnection_0_8.this.getEventLogger().message(ConnectionMessages.IDLE_CLOSE((String)("Current connection state: " + (Object)((Object)AMQPConnection_0_8.this._state)), (boolean)true));
                AMQPConnection_0_8.this._network.close();
                return null;
            }
        }, this.getAccessControllerContext());
    }

    public synchronized void writerIdle() {
        this.writeFrame((AMQDataBlock)HeartbeatBody.FRAME);
    }

    public long getSessionCountLimit() {
        return this.getMaximumNumberOfChannels();
    }

    public String getAddress() {
        return String.valueOf(this._network.getRemoteAddress());
    }

    public void closeSessionAsync(final AMQSessionModel<?> session, final AMQConstant cause, final String message) {
        this.addAsyncTask(new Action<AMQPConnection_0_8>(){

            public void performAction(AMQPConnection_0_8 object) {
                int channelId = session.getChannelId();
                AMQPConnection_0_8.this.closeChannel(channelId, cause, message);
                MethodRegistry methodRegistry = AMQPConnection_0_8.this.getMethodRegistry();
                ChannelCloseBody responseBody = methodRegistry.createChannelCloseBody(cause.getCode(), AMQShortString.validValueOf((Object)message), 0, 0);
                AMQPConnection_0_8.this.writeFrame((AMQDataBlock)responseBody.generateFrame(channelId));
            }
        });
    }

    public void sendConnectionCloseAsync(final AMQConstant cause, final String message) {
        Action<AMQPConnection_0_8> action = new Action<AMQPConnection_0_8>(){

            public void performAction(AMQPConnection_0_8 object) {
                AMQConnectionException e = new AMQConnectionException(cause, message, 0, 0, AMQPConnection_0_8.this.getMethodRegistry(), null);
                AMQPConnection_0_8.this.sendConnectionClose(0, e.getCloseFrame());
            }
        };
        this.addAsyncTask(action);
    }

    private void addAsyncTask(Action<AMQPConnection_0_8> action) {
        this._asyncTaskList.add(action);
        this.notifyWork();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void block() {
        Object object = this._channelAddRemoveLock;
        synchronized (object) {
            if (!this._blocking) {
                this._blocking = true;
                for (AMQChannel channel : this._channelMap.values()) {
                    channel.block();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unblock() {
        Object object = this._channelAddRemoveLock;
        synchronized (object) {
            if (this._blocking) {
                this._blocking = false;
                for (AMQChannel channel : this._channelMap.values()) {
                    channel.unblock();
                }
            }
        }
    }

    public List<AMQChannel> getSessionModels() {
        return new ArrayList<AMQChannel>(this._channelMap.values());
    }

    public String getRemoteContainerName() {
        return this.getClientId();
    }

    public void setDeferFlush(boolean deferFlush) {
        this._deferFlush = deferFlush;
    }

    public boolean hasSessionWithName(byte[] name) {
        return false;
    }

    public void receiveChannelOpen(int channelId) {
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV[" + channelId + "] ChannelOpen");
        }
        this.assertState(ConnectionState.OPEN);
        VirtualHost virtualHost = this.getVirtualHost();
        if (virtualHost == null) {
            this.sendConnectionClose(AMQConstant.COMMAND_INVALID, "Virtualhost has not yet been set. ConnectionOpen has not been called.", channelId);
        } else if (this.getChannel(channelId) != null || this.channelAwaitingClosure(channelId)) {
            this.sendConnectionClose(AMQConstant.CHANNEL_ERROR, "Channel " + channelId + " already exists", channelId);
        } else if ((long)channelId > this.getMaximumNumberOfChannels()) {
            this.sendConnectionClose(AMQConstant.CHANNEL_ERROR, "Channel " + channelId + " cannot be created as the max allowed channel id is " + this.getMaximumNumberOfChannels(), channelId);
        } else {
            _logger.debug("Connecting to: {}", (Object)virtualHost.getName());
            AMQChannel channel = new AMQChannel(this, channelId, virtualHost.getMessageStore());
            this.addChannel(channel);
            ChannelOpenOkBody response = this.getMethodRegistry().createChannelOpenOkBody();
            this.writeFrame((AMQDataBlock)response.generateFrame(channelId));
        }
    }

    void assertState(ConnectionState requiredState) {
        if (this._state != requiredState) {
            String replyText = "Command Invalid, expected " + (Object)((Object)requiredState) + " but was " + (Object)((Object)this._state);
            this.sendConnectionClose(AMQConstant.COMMAND_INVALID, replyText, 0);
            throw new ConnectionScopedRuntimeException(replyText);
        }
    }

    public void receiveConnectionOpen(AMQShortString virtualHostName, AMQShortString capabilities, boolean insist) {
        VirtualHostImpl virtualHost;
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV ConnectionOpen[ virtualHost: " + virtualHostName + " capabilities: " + capabilities + " insist: " + insist + " ]");
        }
        this.assertState(ConnectionState.AWAIT_OPEN);
        String virtualHostStr = AMQShortString.toString((AMQShortString)virtualHostName);
        if (virtualHostStr != null && virtualHostStr.charAt(0) == '/') {
            virtualHostStr = virtualHostStr.substring(1);
        }
        if ((virtualHost = this.getPort().getVirtualHost(virtualHostStr)) == null) {
            this.sendConnectionClose(AMQConstant.NOT_FOUND, "Unknown virtual host: '" + virtualHostName + "'", 0);
        } else if (virtualHost.getState() != State.ACTIVE) {
            String redirectHost = virtualHost.getRedirectHost(this.getPort());
            if (redirectHost != null) {
                this.sendConnectionClose(0, new AMQFrame(0, (AMQBody)new ConnectionRedirectBody(this.getProtocolVersion(), AMQShortString.valueOf((String)redirectHost), null)));
            } else {
                this.sendConnectionClose(AMQConstant.CONNECTION_FORCED, "Virtual host '" + virtualHost.getName() + "' is not active", 0);
            }
        } else {
            try {
                this.setVirtualHost(virtualHost);
                if (virtualHost.authoriseCreateConnection((AMQPConnection)this)) {
                    MethodRegistry methodRegistry = this.getMethodRegistry();
                    ConnectionOpenOkBody responseBody = methodRegistry.createConnectionOpenOkBody(virtualHostName);
                    this.writeFrame((AMQDataBlock)responseBody.generateFrame(0));
                    this._state = ConnectionState.OPEN;
                } else {
                    this.sendConnectionClose(AMQConstant.ACCESS_REFUSED, "Connection refused", 0);
                }
            }
            catch (AccessControlException | VirtualHostUnavailableException e) {
                this.sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(), 0);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void receiveConnectionClose(int replyCode, AMQShortString replyText, int classId, int methodId) {
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV ConnectionClose[ replyCode: " + replyCode + " replyText: " + replyText + " classId: " + classId + " methodId: " + methodId + " ]");
        }
        try {
            if (this._orderlyClose.compareAndSet(false, true)) {
                this.completeAndCloseAllChannels();
            }
            MethodRegistry methodRegistry = this.getMethodRegistry();
            ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody();
            this.writeFrame((AMQDataBlock)responseBody.generateFrame(0));
        }
        catch (Exception e) {
            _logger.error("Error closing connection for " + this.getRemoteAddressString(), (Throwable)e);
        }
        finally {
            this.closeNetworkConnection();
        }
    }

    public void receiveConnectionCloseOk() {
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV ConnectionCloseOk");
        }
        this.closeNetworkConnection();
    }

    public void receiveConnectionSecureOk(byte[] response) {
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV ConnectionSecureOk[ response: ******** ] ");
        }
        this.assertState(ConnectionState.AWAIT_SECURE_OK);
        SubjectCreator subjectCreator = this.getSubjectCreator();
        SaslServer ss = this.getSaslServer();
        if (ss == null) {
            this.sendConnectionClose(AMQConstant.INTERNAL_ERROR, "No SASL context set up in connection", 0);
        }
        this.processSaslResponse(response, subjectCreator, ss);
    }

    private void disposeSaslServer() {
        SaslServer ss = this.getSaslServer();
        if (ss != null) {
            this.setSaslServer(null);
            try {
                ss.dispose();
            }
            catch (SaslException e) {
                _logger.error("Error disposing of Sasl server: " + e);
            }
        }
    }

    public void receiveConnectionStartOk(FieldTable clientProperties, AMQShortString mechanism, byte[] response, AMQShortString locale) {
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV ConnectionStartOk[ clientProperties: " + clientProperties + " mechanism: " + mechanism + " response: ********" + " locale: " + locale + " ]");
        }
        this.assertState(ConnectionState.AWAIT_START_OK);
        _logger.debug("SASL Mechanism selected: {} Locale : {}", (Object)mechanism, (Object)locale);
        SubjectCreator subjectCreator = this.getSubjectCreator();
        try {
            SaslServer ss = subjectCreator.createSaslServer(String.valueOf(mechanism), this.getLocalFQDN(), this.getPeerPrincipal());
            if (ss == null) {
                this.sendConnectionClose(AMQConstant.RESOURCE_ERROR, "Unable to create SASL Server:" + mechanism, 0);
            } else {
                this.setClientProperties(clientProperties);
                this.setSaslServer(ss);
                this.processSaslResponse(response, subjectCreator, ss);
            }
        }
        catch (SaslException e) {
            this.disposeSaslServer();
            this.sendConnectionClose(AMQConstant.INTERNAL_ERROR, "SASL error: " + e, 0);
        }
    }

    private void processSaslResponse(byte[] response, SubjectCreator subjectCreator, SaslServer ss) {
        MethodRegistry methodRegistry = this.getMethodRegistry();
        SubjectAuthenticationResult authResult = this._successfulAuthenticationResult;
        byte[] challenge = null;
        if (authResult == null) {
            authResult = subjectCreator.authenticate(ss, response);
            challenge = authResult.getChallenge();
        }
        switch (authResult.getStatus()) {
            case ERROR: {
                Exception cause = authResult.getCause();
                _logger.debug("Authentication failed: {}", (Object)(cause == null ? "" : cause.getMessage()));
                this.sendConnectionClose(AMQConstant.NOT_ALLOWED, "Authentication failed", 0);
                this.disposeSaslServer();
                break;
            }
            case SUCCESS: {
                this._successfulAuthenticationResult = authResult;
                if (challenge == null || challenge.length == 0) {
                    _logger.debug("Connected as: {}", (Object)authResult.getSubject());
                    this.setAuthorizedSubject(authResult.getSubject());
                    int frameMax = this.getDefaultMaxFrameSize();
                    if (frameMax <= 0) {
                        frameMax = Integer.MAX_VALUE;
                    }
                    Broker broker = this.getBroker();
                    ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), (long)frameMax, broker.getConnection_heartBeatDelay());
                    this.writeFrame((AMQDataBlock)tuneBody.generateFrame(0));
                    this._state = ConnectionState.AWAIT_TUNE_OK;
                    this.disposeSaslServer();
                    break;
                }
                this.continueSaslNegotiation(challenge);
                break;
            }
            case CONTINUE: {
                this.continueSaslNegotiation(challenge);
            }
        }
    }

    private void continueSaslNegotiation(byte[] challenge) {
        ConnectionSecureBody secureBody = this.getMethodRegistry().createConnectionSecureBody(challenge);
        this.writeFrame((AMQDataBlock)secureBody.generateFrame(0));
        this._state = ConnectionState.AWAIT_SECURE_OK;
    }

    public void receiveConnectionTuneOk(int channelMax, long frameMax, int heartbeat) {
        int brokerFrameMax;
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV ConnectionTuneOk[ channelMax: " + channelMax + " frameMax: " + frameMax + " heartbeat: " + heartbeat + " ]");
        }
        this.assertState(ConnectionState.AWAIT_TUNE_OK);
        if (heartbeat > 0) {
            long writerDelay = 1000L * (long)heartbeat;
            long readerDelay = 1000L * (long)BrokerProperties.HEARTBEAT_TIMEOUT_FACTOR * (long)heartbeat;
            this.initialiseHeartbeating(writerDelay, readerDelay);
        }
        if ((brokerFrameMax = this.getDefaultMaxFrameSize()) <= 0) {
            brokerFrameMax = Integer.MAX_VALUE;
        }
        if (frameMax > (long)brokerFrameMax) {
            this.sendConnectionClose(AMQConstant.SYNTAX_ERROR, "Attempt to set max frame size to " + frameMax + " greater than the broker will allow: " + brokerFrameMax, 0);
        } else if (frameMax > 0L && frameMax < (long)AMQConstant.FRAME_MIN_SIZE.getCode()) {
            this.sendConnectionClose(AMQConstant.SYNTAX_ERROR, "Attempt to set max frame size to " + frameMax + " which is smaller than the specification defined minimum: " + AMQConstant.FRAME_MIN_SIZE.getCode(), 0);
        } else {
            int calculatedFrameMax = frameMax == 0L ? brokerFrameMax : (int)frameMax;
            this.setMaxFrameSize(calculatedFrameMax);
            this.setMaximumNumberOfChannels((long)channelMax == 0L || (long)channelMax > 65535L ? 65535L : (long)channelMax);
        }
        this._state = ConnectionState.AWAIT_OPEN;
    }

    public int getBinaryDataLimit() {
        return this._binaryDataLimit;
    }

    public long getMaxMessageSize() {
        return this._maxMessageSize;
    }

    public Object getReference() {
        return this._reference;
    }

    public boolean isCloseWhenNoRoute() {
        return this._closeWhenNoRoute;
    }

    public boolean isCompressionSupported() {
        return this._compressionSupported && this.getBroker().isMessageCompressionEnabled();
    }

    public int getMessageCompressionThreshold() {
        return this._messageCompressionThreshold;
    }

    private SubjectCreator getSubjectCreator() {
        return this.getPort().getAuthenticationProvider().getSubjectCreator(this.getTransport().isSecure());
    }

    public EventLogger getEventLogger() {
        VirtualHostImpl virtualHost = (VirtualHostImpl)this.getVirtualHost();
        if (virtualHost != null) {
            return virtualHost.getEventLogger();
        }
        return this.getBroker().getEventLogger();
    }

    public ServerChannelMethodProcessor getChannelMethodProcessor(final int channelId) {
        this.assertState(ConnectionState.OPEN);
        AMQChannel channelMethodProcessor = this.getChannel(channelId);
        if (channelMethodProcessor == null) {
            channelMethodProcessor = (ServerChannelMethodProcessor)Proxy.newProxyInstance(ServerMethodDispatcher.class.getClassLoader(), new Class[]{ServerChannelMethodProcessor.class}, new InvocationHandler(){

                @Override
                public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                    if (method.getName().startsWith("receive")) {
                        AMQPConnection_0_8.this.sendConnectionClose(AMQConstant.CHANNEL_ERROR, "Unknown channel id: " + channelId, channelId);
                        return null;
                    }
                    if (method.getName().equals("ignoreAllButCloseOk")) {
                        return false;
                    }
                    return null;
                }
            });
        }
        return channelMethodProcessor;
    }

    public void receiveHeartbeat() {
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV Heartbeat");
        }
    }

    public void receiveProtocolHeader(ProtocolInitiation protocolInitiation) {
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV ProtocolHeader [" + protocolInitiation + " ]");
        }
        this.protocolInitiationReceived(protocolInitiation);
    }

    public void setCurrentMethod(int classId, int methodId) {
        this._currentClassId = classId;
        this._currentMethodId = methodId;
    }

    public boolean ignoreAllButCloseOk() {
        return this.isClosing();
    }

    public boolean hasWork() {
        return this._stateChanged.get();
    }

    public void notifyWork() {
        this._stateChanged.set(true);
        Action<ProtocolEngine> listener = this._workListener.get();
        if (listener != null) {
            listener.performAction((Object)this);
        }
    }

    public void clearWork() {
        this._stateChanged.set(false);
    }

    public void setWorkListener(Action<ProtocolEngine> listener) {
        this._workListener.set(listener);
    }

    public Iterator<Runnable> processPendingIterator() {
        if (!this.isIOThread()) {
            return Collections.emptyIterator();
        }
        return new ProcessPendingIterator();
    }

    private class ProcessPendingIterator
    implements Iterator<Runnable> {
        private final List<? extends AMQSessionModel<?>> _sessionsWithPending;
        private Iterator<? extends AMQSessionModel<?>> _sessionIterator;

        private ProcessPendingIterator() {
            this._sessionsWithPending = new ArrayList<AMQChannel>(AMQPConnection_0_8.this.getSessionModels());
            this._sessionIterator = this._sessionsWithPending.iterator();
        }

        @Override
        public boolean hasNext() {
            return !this._sessionsWithPending.isEmpty() || !AMQPConnection_0_8.this._asyncTaskList.isEmpty();
        }

        @Override
        public Runnable next() {
            if (!this._sessionsWithPending.isEmpty()) {
                if (!this._sessionIterator.hasNext()) {
                    this._sessionIterator = this._sessionsWithPending.iterator();
                }
                final AMQSessionModel<?> session = this._sessionIterator.next();
                return new Runnable(){

                    @Override
                    public void run() {
                        if (!session.processPending()) {
                            ProcessPendingIterator.this._sessionIterator.remove();
                        }
                    }
                };
            }
            if (!AMQPConnection_0_8.this._asyncTaskList.isEmpty()) {
                final Action asyncAction = (Action)AMQPConnection_0_8.this._asyncTaskList.poll();
                return new Runnable(){

                    @Override
                    public void run() {
                        asyncAction.performAction((Object)AMQPConnection_0_8.this);
                    }
                };
            }
            throw new NoSuchElementException();
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }
    }

    public final class WriteDeliverMethod
    implements ClientDeliveryMethod {
        private final int _channelId;

        public WriteDeliverMethod(int channelId) {
            this._channelId = channelId;
        }

        @Override
        public long deliverToClient(ConsumerImpl sub, ServerMessage message, InstanceProperties props, long deliveryTag) {
            long size = AMQPConnection_0_8.this._protocolOutputConverter.writeDeliver(message, props, this._channelId, deliveryTag, new AMQShortString(sub.getName()));
            AMQPConnection_0_8.this.registerMessageDelivered(size);
            return size;
        }
    }

    static enum ConnectionState {
        INIT,
        AWAIT_START_OK,
        AWAIT_SECURE_OK,
        AWAIT_TUNE_OK,
        AWAIT_OPEN,
        OPEN;

    }
}

