/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.server.protocol.v1_0;

import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.net.SocketAddress;
import java.security.AccessControlContext;
import java.security.AccessControlException;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.security.auth.Subject;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.AuthenticationProvider;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.ConnectionPropertyEnricher;
import org.apache.qpid.server.protocol.ConnectionClosingTicker;
import org.apache.qpid.server.protocol.v1_0.AMQPConnection_1_0;
import org.apache.qpid.server.protocol.v1_0.ConnectionHandler;
import org.apache.qpid.server.protocol.v1_0.ConnectionState;
import org.apache.qpid.server.protocol.v1_0.IdentifiedTransaction;
import org.apache.qpid.server.protocol.v1_0.SASLEndpoint;
import org.apache.qpid.server.protocol.v1_0.Session_1_0;
import org.apache.qpid.server.protocol.v1_0.UnknownTransactionException;
import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructorRegistry;
import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
import org.apache.qpid.server.protocol.v1_0.codec.ProtocolHandler;
import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
import org.apache.qpid.server.protocol.v1_0.framing.FrameHandler;
import org.apache.qpid.server.protocol.v1_0.framing.OversizeFrameException;
import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.ErrorCondition;
import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
import org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionConnectionProperties;
import org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionDetectionPolicy;
import org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionEnforcementPolicy;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
import org.apache.qpid.server.protocol.v1_0.type.transport.ChannelFrameBody;
import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
import org.apache.qpid.server.protocol.v1_0.type.transport.End;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.AuthenticationResult;
import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager;
import org.apache.qpid.server.security.auth.sasl.SaslNegotiator;
import org.apache.qpid.server.security.auth.sasl.SaslSettings;
import org.apache.qpid.server.session.AMQPSession;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.transport.AbstractAMQPConnection;
import org.apache.qpid.server.transport.AggregateTicker;
import org.apache.qpid.server.transport.ByteBufferSender;
import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.transport.ServerNetworkConnection;
import org.apache.qpid.server.transport.network.Ticker;
import org.apache.qpid.server.transport.util.Functions;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQPConnection_1_0Impl
extends AbstractAMQPConnection<AMQPConnection_1_0Impl, ConnectionHandler>
implements DescribedTypeConstructorRegistry.Source,
ValueWriter.Registry.Source,
SASLEndpoint,
AMQPConnection_1_0<AMQPConnection_1_0Impl> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AMQPConnection_1_0Impl.class);
    private static final Logger FRAME_LOGGER = LoggerFactory.getLogger((String)"org.apache.qpid.server.protocol.frame");
    private final AtomicBoolean _stateChanged = new AtomicBoolean();
    private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference();
    private static final byte[] SASL_HEADER = new byte[]{65, 77, 81, 80, 3, 1, 0, 0};
    private static final byte[] AMQP_HEADER = new byte[]{65, 77, 81, 80, 0, 1, 0, 0};
    private final FrameWriter _frameWriter;
    private ProtocolHandler _frameHandler;
    private volatile boolean _transportBlockedForWriting;
    private volatile SubjectAuthenticationResult _successfulAuthenticationResult;
    private boolean _blocking;
    private final Object _blockingLock = new Object();
    private List<Symbol> _offeredCapabilities;
    private SoleConnectionEnforcementPolicy _soleConnectionEnforcementPolicy;
    private static final int CONNECTION_CONTROL_CHANNEL = 0;
    private final SubjectCreator _subjectCreator;
    private int _channelMax = 0;
    private int _maxFrameSize = 4096;
    private String _remoteContainerId;
    private SocketAddress _remoteAddress;
    private Session_1_0[] _sendingSessions;
    private Session_1_0[] _receivingSessions;
    private volatile boolean _closedForOutput;
    private final long _incomingIdleTimeout;
    private volatile long _outgoingIdleTimeout;
    private volatile ConnectionState _connectionState = ConnectionState.AWAIT_AMQP_OR_SASL_HEADER;
    private final AMQPDescribedTypeRegistry _describedTypeRegistry = AMQPDescribedTypeRegistry.newInstance().registerTransportLayer().registerMessagingLayer().registerTransactionLayer().registerSecurityLayer().registerExtensionSoleconnLayer();
    private final Map<Symbol, Object> _properties = new LinkedHashMap<Symbol, Object>();
    private volatile boolean _saslComplete;
    private volatile SaslNegotiator _saslNegotiator;
    private String _localHostname;
    private static final long MINIMUM_SUPPORTED_IDLE_TIMEOUT = 1000L;
    private Set<Symbol> _remoteDesiredCapabilities;
    private final AtomicBoolean _orderlyClose = new AtomicBoolean(false);
    private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList());
    private final Object _reference = new Object();
    private final Queue<Action<? super ConnectionHandler>> _asyncTaskList = new ConcurrentLinkedQueue<Action<? super ConnectionHandler>>();
    private final Set<AMQPSession<?, ?>> _sessionsWithWork = Collections.newSetFromMap(new ConcurrentHashMap());
    private volatile ServerTransaction[] _openTransactions = new ServerTransaction[16];
    private volatile boolean _sendSaslFinalChallengeAsChallenge;
    private volatile String _closeCause;

    AMQPConnection_1_0Impl(Broker<?> broker, ServerNetworkConnection network, AmqpPort<?> port, Transport transport, long id, AggregateTicker aggregateTicker) {
        super(broker, network, port, transport, Protocol.AMQP_1_0, id, aggregateTicker);
        this._subjectCreator = port.getSubjectCreator(transport.isSecure(), network.getSelectedHost());
        ArrayList<Symbol> offeredCapabilities = new ArrayList<Symbol>();
        offeredCapabilities.add(ANONYMOUS_RELAY);
        offeredCapabilities.add(SHARED_SUBSCRIPTIONS);
        offeredCapabilities.add(SoleConnectionConnectionProperties.SOLE_CONNECTION_FOR_CONTAINER);
        this.setOfferedCapabilities(offeredCapabilities);
        this.setRemoteAddress(network.getRemoteAddress());
        this._incomingIdleTimeout = 1000L * (long)port.getHeartbeatDelay();
        this._frameWriter = new FrameWriter(this.getDescribedTypeRegistry(), this.getSender());
    }

    protected void onOpen() {
        super.onOpen();
        this._sendSaslFinalChallengeAsChallenge = (Boolean)this.getContextValue(Boolean.class, "connection.sendSaslFinalResponseAsChallenge");
    }

    @Override
    public void receiveSaslInit(SaslInit saslInit) {
        this.assertState(ConnectionState.AWAIT_SASL_INIT);
        if (saslInit.getHostname() != null && !"".equals(saslInit.getHostname().trim())) {
            this._localHostname = saslInit.getHostname();
        } else if (this.getNetwork().getSelectedHost() != null) {
            this._localHostname = this.getNetwork().getSelectedHost();
        }
        String mechanism = saslInit.getMechanism().toString();
        Binary initialResponse = saslInit.getInitialResponse();
        byte[] response = initialResponse == null ? new byte[]{} : initialResponse.getArray();
        List availableMechanisms = this._subjectCreator.getAuthenticationProvider().getAvailableMechanisms(this.getTransport().isSecure());
        if (!availableMechanisms.contains(mechanism)) {
            this.handleSaslError();
        } else {
            this._saslNegotiator = this._subjectCreator.createSaslNegotiator(mechanism, (SaslSettings)this);
            this.processSaslResponse(response);
        }
    }

    @Override
    public void receiveSaslResponse(SaslResponse saslResponse) {
        this.assertState(ConnectionState.AWAIT_SASL_RESPONSE);
        Binary responseBinary = saslResponse.getResponse();
        byte[] response = responseBinary == null ? new byte[]{} : responseBinary.getArray();
        this.processSaslResponse(response);
    }

    @Override
    public void receiveSaslMechanisms(SaslMechanisms saslMechanisms) {
        LOGGER.info("{} : Unexpected frame sasl-mechanisms", (Object)this.getLogSubject());
        this.closeSaslWithFailure();
    }

    @Override
    public void receiveSaslChallenge(SaslChallenge saslChallenge) {
        LOGGER.info("{} : Unexpected frame sasl-challenge", (Object)this.getLogSubject());
        this.closeSaslWithFailure();
    }

    @Override
    public void receiveSaslOutcome(SaslOutcome saslOutcome) {
        LOGGER.info("{} : Unexpected frame sasl-outcome", (Object)this.getLogSubject());
        this.closeSaslWithFailure();
    }

    private void processSaslResponse(byte[] response) {
        byte[] challenge = null;
        SubjectAuthenticationResult authenticationResult = this._successfulAuthenticationResult;
        if (authenticationResult == null) {
            authenticationResult = this._subjectCreator.authenticate(this._saslNegotiator, response != null ? response : new byte[]{});
            challenge = authenticationResult.getChallenge();
        }
        if (authenticationResult.getStatus() == AuthenticationResult.AuthenticationStatus.SUCCESS) {
            boolean finalChallenge = challenge != null && challenge.length != 0;
            this._successfulAuthenticationResult = authenticationResult;
            if (this._sendSaslFinalChallengeAsChallenge && finalChallenge) {
                this.continueSaslNegotiation(challenge);
            } else {
                this.setSubject(this._successfulAuthenticationResult.getSubject());
                SaslOutcome outcome = new SaslOutcome();
                outcome.setCode(SaslCode.OK);
                if (finalChallenge) {
                    outcome.setAdditionalData(new Binary(challenge));
                }
                this.send(new SASLFrame(outcome));
                this._saslComplete = true;
                this._connectionState = ConnectionState.AWAIT_AMQP_HEADER;
                this.disposeSaslNegotiator();
            }
        } else if (authenticationResult.getStatus() == AuthenticationResult.AuthenticationStatus.CONTINUE) {
            this.continueSaslNegotiation(challenge);
        } else {
            this.handleSaslError();
        }
    }

    private void continueSaslNegotiation(byte[] challenge) {
        SaslChallenge challengeBody = new SaslChallenge();
        challengeBody.setChallenge(new Binary(challenge));
        this.send(new SASLFrame(challengeBody));
        this._connectionState = ConnectionState.AWAIT_SASL_RESPONSE;
    }

    private void handleSaslError() {
        SaslOutcome outcome = new SaslOutcome();
        outcome.setCode(SaslCode.AUTH);
        this.send(new SASLFrame(outcome));
        this._saslComplete = true;
        this.closeSaslWithFailure();
    }

    private void closeSaslWithFailure() {
        this._saslComplete = true;
        this.disposeSaslNegotiator();
        this._connectionState = ConnectionState.CLOSED;
        this.addCloseTicker();
    }

    private void disposeSaslNegotiator() {
        if (this._saslNegotiator != null) {
            this._saslNegotiator.dispose();
        }
        this._saslNegotiator = null;
    }

    private void setUserPrincipal(Principal user) {
        this.setSubject(this._subjectCreator.createSubjectWithGroups(user));
    }

    @Override
    public long getIncomingIdleTimeout() {
        return this._incomingIdleTimeout;
    }

    @Override
    public long getOutgoingIdleTimeout() {
        return this._outgoingIdleTimeout;
    }

    @Override
    public void receiveAttach(int channel, Attach attach) {
        this.assertState(ConnectionState.OPENED);
        Session_1_0 session = this.getSession(channel);
        if (session != null) {
            session.receiveAttach(attach);
        } else {
            this.closeConnection(AmqpError.INVALID_FIELD, "Channel " + channel + " is not associated with a session");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void receive(List<ChannelFrameBody> channelFrameBodies) {
        if (!channelFrameBodies.isEmpty()) {
            PeekingIterator itr = Iterators.peekingIterator(channelFrameBodies.iterator());
            boolean cleanExit = false;
            try {
                while (itr.hasNext()) {
                    Session_1_0 session;
                    ChannelFrameBody channelFrameBody = (ChannelFrameBody)itr.next();
                    int frameChannel = channelFrameBody.getChannel();
                    Session_1_0 session_1_0 = session = this._receivingSessions == null || frameChannel >= this._receivingSessions.length ? null : this._receivingSessions[frameChannel];
                    if (session != null) {
                        AccessControlContext context = session.getAccessControllerContext();
                        AccessController.doPrivileged(() -> {
                            boolean nextIsSameChannel;
                            ChannelFrameBody channelFrame = channelFrameBody;
                            do {
                                this.received(frameChannel, channelFrame.getFrameBody());
                                boolean bl = nextIsSameChannel = itr.hasNext() && frameChannel == ((ChannelFrameBody)itr.peek()).getChannel();
                                if (!nextIsSameChannel) continue;
                                channelFrame = (ChannelFrameBody)itr.next();
                            } while (nextIsSameChannel);
                            return null;
                        }, context);
                        continue;
                    }
                    this.received(frameChannel, channelFrameBody.getFrameBody());
                }
                cleanExit = true;
            }
            finally {
                if (!cleanExit) {
                    while (itr.hasNext()) {
                        Object frameBody = ((ChannelFrameBody)itr.next()).getFrameBody();
                        if (!(frameBody instanceof Transfer)) continue;
                        ((Transfer)frameBody).dispose();
                    }
                }
            }
        }
    }

    private void received(int channel, Object val) {
        if (channel > this.getChannelMax()) {
            Error error = new Error(ConnectionError.FRAMING_ERROR, String.format("specified channel %d larger than maximum channel %d", channel, this.getChannelMax()));
            this.handleError(error);
            return;
        }
        FRAME_LOGGER.debug("RECV[{}|{}] : {}", new Object[]{this._remoteAddress, channel, val});
        if (val instanceof FrameBody) {
            ((FrameBody)val).invoke(channel, this);
        } else if (val instanceof SaslFrameBody) {
            ((SaslFrameBody)val).invoke(channel, this);
        }
    }

    @Override
    public void receiveClose(int channel, Close close) {
        switch (this._connectionState) {
            case AWAIT_AMQP_OR_SASL_HEADER: 
            case AWAIT_SASL_INIT: 
            case AWAIT_SASL_RESPONSE: 
            case AWAIT_AMQP_HEADER: {
                throw new ConnectionScopedRuntimeException("Received unexpected close when AMQP connection has not been established.");
            }
            case AWAIT_OPEN: {
                this.closeReceived();
                this.closeConnection(ConnectionError.CONNECTION_FORCED, "Connection close sent before connection was opened");
                break;
            }
            case OPENED: {
                this._connectionState = ConnectionState.CLOSE_RECEIVED;
                this.closeReceived();
                if (close.getError() != null) {
                    Error error = close.getError();
                    ErrorCondition condition = error.getCondition();
                    Symbol errorCondition = condition == null ? null : condition.getValue();
                    LOGGER.info("{} : Connection closed with error : {} - {}", new Object[]{this.getLogSubject(), errorCondition, close.getError().getDescription()});
                }
                this.sendClose(new Close());
                this._connectionState = ConnectionState.CLOSED;
                this._orderlyClose.set(true);
                this.addCloseTicker();
                break;
            }
            case CLOSE_SENT: {
                this.closeReceived();
                this._connectionState = ConnectionState.CLOSED;
                this._orderlyClose.set(true);
                break;
            }
            case CLOSE_RECEIVED: 
            case CLOSED: {
                break;
            }
            default: {
                throw new ServerScopedRuntimeException("Unknown state: " + (Object)((Object)this._connectionState));
            }
        }
    }

    private void closeReceived() {
        ArrayList<Session_1_0> sessions = new ArrayList<Session_1_0>(this._sessions);
        for (Session_1_0 session : sessions) {
            AccessController.doPrivileged(() -> {
                session.remoteEnd(new End());
                return null;
            }, session.getAccessControllerContext());
        }
    }

    @Override
    public AMQPDescribedTypeRegistry getDescribedTypeRegistry() {
        return this._describedTypeRegistry;
    }

    @Override
    public SectionDecoderRegistry getSectionDecoderRegistry() {
        return this._describedTypeRegistry.getSectionDecoderRegistry();
    }

    @Override
    public boolean isClosed() {
        return this._connectionState == ConnectionState.CLOSED || this._connectionState == ConnectionState.CLOSE_RECEIVED;
    }

    public boolean isClosing() {
        return this._connectionState == ConnectionState.CLOSED || this._connectionState == ConnectionState.CLOSE_RECEIVED || this._connectionState == ConnectionState.CLOSE_SENT;
    }

    @Override
    public boolean closedForInput() {
        return this._connectionState == ConnectionState.CLOSE_RECEIVED || this._connectionState == ConnectionState.CLOSED;
    }

    @Override
    public void sessionEnded(Session_1_0 session) {
        this._sessions.remove((Object)session);
    }

    private void inputClosed() {
        if (!this.closedForInput()) {
            FRAME_LOGGER.debug("RECV[{}] : {}", (Object)this._remoteAddress, (Object)"Underlying connection closed");
            this._connectionState = ConnectionState.CLOSED;
            this.closeSender();
            this.closeReceived();
        }
    }

    private void closeSender() {
        this.setClosedForOutput(true);
    }

    @Override
    public String getRemoteContainerId() {
        return this._remoteContainerId;
    }

    public boolean isOpen() {
        return this._connectionState == ConnectionState.OPENED;
    }

    @Override
    public void sendEnd(int channel, End end, boolean remove) {
        this.sendFrame(channel, end);
        if (remove) {
            this._sendingSessions[channel] = null;
        }
    }

    @Override
    public void receiveEnd(int channel, End end) {
        this.assertState(ConnectionState.OPENED);
        Session_1_0 session = this.getSession(channel);
        if (session != null) {
            this._receivingSessions[channel] = null;
            session.receiveEnd(end);
        } else {
            this.closeConnectionWithInvalidChannel(channel, end);
        }
    }

    private void closeConnectionWithInvalidChannel(int channel, FrameBody frame) {
        this.closeConnection(AmqpError.INVALID_FIELD, String.format("%s frame received on channel %d which is not mapped", frame.getClass().getSimpleName().toLowerCase(), channel));
    }

    @Override
    public void receiveDisposition(int channel, Disposition disposition) {
        this.assertState(ConnectionState.OPENED);
        Session_1_0 session = this.getSession(channel);
        if (session != null) {
            session.receiveDisposition(disposition);
        } else {
            this.closeConnectionWithInvalidChannel(channel, disposition);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void receiveBegin(int receivingChannelId, Begin begin) {
        this.assertState(ConnectionState.OPENED);
        if (begin.getRemoteChannel() != null) {
            this.closeConnection(ConnectionError.FRAMING_ERROR, "BEGIN received on channel " + receivingChannelId + " with given remote-channel " + begin.getRemoteChannel() + ". Since the broker does not spontaneously start channels, this must be an error.");
        } else if (this._receivingSessions[receivingChannelId] == null) {
            int sendingChannelId = this.getFirstFreeChannel();
            if (sendingChannelId == -1) {
                this.closeConnection(ConnectionError.FRAMING_ERROR, "BEGIN received on channel " + receivingChannelId + ". There are no free channels for the broker to respond on.");
            } else {
                Session_1_0 session = new Session_1_0(this, begin, sendingChannelId, receivingChannelId, (Long)this.getContextValue(Long.class, "connection.sessionCreditWindowSize"));
                session.create();
                this._receivingSessions[receivingChannelId] = session;
                this._sendingSessions[sendingChannelId] = session;
                Begin beginToSend = new Begin();
                beginToSend.setRemoteChannel(UnsignedShort.valueOf(receivingChannelId));
                beginToSend.setNextOutgoingId(session.getNextOutgoingId());
                beginToSend.setOutgoingWindow(session.getOutgoingWindow());
                beginToSend.setIncomingWindow(session.getIncomingWindow());
                this.sendFrame(sendingChannelId, beginToSend);
                Object object = this._blockingLock;
                synchronized (object) {
                    this._sessions.add(session);
                    if (this._blocking) {
                        session.block();
                    }
                }
            }
        } else {
            this.closeConnection(ConnectionError.FRAMING_ERROR, "BEGIN received on channel " + receivingChannelId + " which is already in use.");
        }
    }

    private int getFirstFreeChannel() {
        for (int i = 0; i <= this._channelMax; ++i) {
            if (this._sendingSessions[i] != null) continue;
            return i;
        }
        return -1;
    }

    @Override
    public void handleError(Error error) {
        if (!this._closedForOutput) {
            this.closeConnection(error);
        }
    }

    @Override
    public void receiveTransfer(int channel, Transfer transfer) {
        this.assertState(ConnectionState.OPENED);
        Session_1_0 session = this.getSession(channel);
        if (session != null) {
            session.receiveTransfer(transfer);
        } else {
            this.closeConnectionWithInvalidChannel(channel, transfer);
        }
    }

    @Override
    public void receiveFlow(int channel, Flow flow) {
        this.assertState(ConnectionState.OPENED);
        Session_1_0 session = this.getSession(channel);
        if (session != null) {
            session.receiveFlow(flow);
        } else {
            this.closeConnectionWithInvalidChannel(channel, flow);
        }
    }

    @Override
    public void receiveOpen(int channel, Open open) {
        this.assertState(ConnectionState.AWAIT_OPEN);
        int channelMax = this.getPort().getSessionCountLimit() - 1;
        int n = open.getChannelMax() == null ? channelMax : (this._channelMax = open.getChannelMax().intValue() < channelMax ? open.getChannelMax().intValue() : channelMax);
        if (this._receivingSessions == null) {
            this._receivingSessions = new Session_1_0[this._channelMax + 1];
            this._sendingSessions = new Session_1_0[this._channelMax + 1];
        }
        this._maxFrameSize = open.getMaxFrameSize() == null || open.getMaxFrameSize().longValue() > (long)this.getBroker().getNetworkBufferSize() ? this.getBroker().getNetworkBufferSize() : open.getMaxFrameSize().intValue();
        this._remoteContainerId = open.getContainerId();
        if (open.getHostname() != null && !"".equals(open.getHostname().trim())) {
            this._localHostname = open.getHostname();
        }
        if (this._localHostname == null || "".equals(this._localHostname.trim()) && this.getNetwork().getSelectedHost() != null) {
            this._localHostname = this.getNetwork().getSelectedHost();
        }
        if (open.getIdleTimeOut() != null) {
            this._outgoingIdleTimeout = open.getIdleTimeOut().longValue();
        }
        Map remoteProperties = open.getProperties() == null ? Collections.emptyMap() : Collections.unmodifiableMap(new LinkedHashMap<Symbol, Object>(open.getProperties()));
        Set<Object> set = this._remoteDesiredCapabilities = open.getDesiredCapabilities() == null ? Collections.emptySet() : Sets.newHashSet((Object[])open.getDesiredCapabilities());
        if (remoteProperties.containsKey(Symbol.valueOf("product"))) {
            this.setClientProduct(remoteProperties.get(Symbol.valueOf("product")).toString());
        }
        if (remoteProperties.containsKey(Symbol.valueOf("version"))) {
            this.setClientVersion(remoteProperties.get(Symbol.valueOf("version")).toString());
        }
        this.setClientId(this._remoteContainerId);
        if (this._remoteDesiredCapabilities.contains(SoleConnectionConnectionProperties.SOLE_CONNECTION_FOR_CONTAINER)) {
            if (remoteProperties != null && remoteProperties.containsKey(SoleConnectionConnectionProperties.SOLE_CONNECTION_ENFORCEMENT_POLICY)) {
                try {
                    this._soleConnectionEnforcementPolicy = SoleConnectionEnforcementPolicy.valueOf(remoteProperties.get(SoleConnectionConnectionProperties.SOLE_CONNECTION_ENFORCEMENT_POLICY));
                }
                catch (IllegalArgumentException e) {
                    this.closeConnection(AmqpError.INVALID_FIELD, e.getMessage());
                    return;
                }
            } else {
                this._soleConnectionEnforcementPolicy = SoleConnectionEnforcementPolicy.REFUSE_CONNECTION;
            }
        }
        if (this._outgoingIdleTimeout != 0L && this._outgoingIdleTimeout < 1000L) {
            this.closeConnection(ConnectionError.CONNECTION_FORCED, "Requested idle timeout of " + this._outgoingIdleTimeout + " is too low. The minimum supported timeout is" + 1000L);
        } else {
            this.initialiseHeartbeating(this._outgoingIdleTimeout / 2L, this._incomingIdleTimeout);
            NamedAddressSpace addressSpace = this.getPort().getAddressSpace(this._localHostname);
            if (addressSpace == null) {
                this.closeConnection(AmqpError.NOT_FOUND, "Unknown hostname in connection open: '" + this._localHostname + "'");
            } else {
                this.receiveOpenInternal(addressSpace);
            }
        }
    }

    private void receiveOpenInternal(NamedAddressSpace addressSpace) {
        if (!addressSpace.isActive()) {
            Error err = new Error();
            this.populateConnectionRedirect(addressSpace, err);
            this.closeConnection(err);
        } else if (AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject((Subject)this.getSubject()) == null) {
            this.closeConnection(AmqpError.NOT_ALLOWED, "Connection has not been authenticated");
        } else {
            try {
                boolean registerSucceeded = addressSpace.registerConnection((AMQPConnection)this, (existingConnections, newConnection) -> {
                    boolean proceedWithRegistration = true;
                    if (newConnection instanceof AMQPConnection_1_0Impl && !newConnection.isClosing()) {
                        ArrayList<ListenableFuture> rescheduleFutures = new ArrayList<ListenableFuture>();
                        for (AMQPConnection existingConnection : StreamSupport.stream(existingConnections.spliterator(), false).filter(con -> con instanceof AMQPConnection_1_0).filter(con -> !con.isClosing()).filter(con -> con.getRemoteContainerName().equals(newConnection.getRemoteContainerName())).collect(Collectors.toList())) {
                            Error error;
                            SoleConnectionEnforcementPolicy soleConnectionEnforcementPolicy = null;
                            if (((AMQPConnection_1_0Impl)existingConnection)._soleConnectionEnforcementPolicy != null) {
                                soleConnectionEnforcementPolicy = ((AMQPConnection_1_0Impl)existingConnection)._soleConnectionEnforcementPolicy;
                            } else if (((AMQPConnection_1_0Impl)newConnection)._soleConnectionEnforcementPolicy != null) {
                                soleConnectionEnforcementPolicy = ((AMQPConnection_1_0Impl)newConnection)._soleConnectionEnforcementPolicy;
                            }
                            if (SoleConnectionEnforcementPolicy.REFUSE_CONNECTION.equals(soleConnectionEnforcementPolicy)) {
                                this._properties.put(Symbol.valueOf("amqp:connection-establishment-failed"), true);
                                error = new Error(AmqpError.INVALID_FIELD, String.format("Connection closed due to sole-connection-enforcement-policy '%s'", soleConnectionEnforcementPolicy.toString()));
                                error.setInfo(Collections.singletonMap(Symbol.valueOf("invalid-field"), Symbol.valueOf("container-id")));
                                newConnection.doOnIOThreadAsync(() -> ((AMQPConnection_1_0Impl)newConnection).closeConnection(error));
                                proceedWithRegistration = false;
                                break;
                            }
                            if (!SoleConnectionEnforcementPolicy.CLOSE_EXISTING.equals(soleConnectionEnforcementPolicy)) continue;
                            error = new Error(AmqpError.RESOURCE_LOCKED, String.format("Connection closed due to sole-connection-enforcement-policy '%s'", soleConnectionEnforcementPolicy.toString()));
                            error.setInfo(Collections.singletonMap(Symbol.valueOf("sole-connection-enforcement"), true));
                            rescheduleFutures.add(existingConnection.doOnIOThreadAsync(() -> ((AMQPConnection_1_0Impl)existingConnection).closeConnection(error)));
                            proceedWithRegistration = false;
                        }
                        if (!rescheduleFutures.isEmpty()) {
                            this.doAfter(Futures.allAsList(rescheduleFutures), () -> newConnection.doOnIOThreadAsync(() -> this.receiveOpenInternal(addressSpace)));
                        }
                    }
                    return proceedWithRegistration;
                });
                if (registerSucceeded) {
                    this.setAddressSpace(addressSpace);
                    if (!addressSpace.authoriseCreateConnection((AMQPConnection)this)) {
                        this.closeConnection(AmqpError.NOT_ALLOWED, "Connection refused");
                    } else {
                        switch (this._connectionState) {
                            case AWAIT_OPEN: {
                                this.sendOpen(this._channelMax, this._maxFrameSize);
                                this._connectionState = ConnectionState.OPENED;
                                break;
                            }
                            case CLOSE_SENT: 
                            case CLOSED: {
                                break;
                            }
                            default: {
                                throw new ConnectionScopedRuntimeException(String.format("Unexpected state %s during connection open.", new Object[]{this._connectionState}));
                            }
                        }
                    }
                }
            }
            catch (AccessControlException | VirtualHostUnavailableException e) {
                this.closeConnection(AmqpError.NOT_ALLOWED, e.getMessage());
            }
        }
    }

    private void populateConnectionRedirect(NamedAddressSpace addressSpace, Error err) {
        String redirectHost = addressSpace.getRedirectHost(this.getPort());
        if (redirectHost == null) {
            err.setCondition(ConnectionError.CONNECTION_FORCED);
            err.setDescription("Virtual host '" + this._localHostname + "' is not active");
        } else {
            int port;
            String networkHost;
            err.setCondition(ConnectionError.REDIRECT);
            if (redirectHost.matches("\\[[0-9a-f:]+\\](:[0-9]+)?")) {
                networkHost = redirectHost.substring(1, redirectHost.indexOf("]"));
                port = redirectHost.contains("]:") ? Integer.parseInt(redirectHost.substring(redirectHost.indexOf("]") + 2)) : -1;
            } else if (redirectHost.contains(":")) {
                networkHost = redirectHost.substring(0, redirectHost.lastIndexOf(":"));
                try {
                    String portString = redirectHost.substring(redirectHost.lastIndexOf(":") + 1);
                    port = Integer.parseInt(portString);
                }
                catch (NumberFormatException e) {
                    port = -1;
                }
            } else {
                networkHost = redirectHost;
                port = -1;
            }
            HashMap<Symbol, Object> infoMap = new HashMap<Symbol, Object>();
            infoMap.put(Symbol.valueOf("network-host"), networkHost);
            if (port > 0) {
                infoMap.put(Symbol.valueOf("port"), UnsignedInteger.valueOf(port));
            }
            err.setInfo(infoMap);
        }
    }

    @Override
    public void receiveDetach(int channel, Detach detach) {
        this.assertState(ConnectionState.OPENED);
        Session_1_0 session = this.getSession(channel);
        if (session != null) {
            session.receiveDetach(detach);
        } else {
            this.closeConnectionWithInvalidChannel(channel, detach);
        }
    }

    private void transportStateChanged() {
        for (Session_1_0 session : this._sessions) {
            session.transportStateChanged();
        }
    }

    @Override
    public void close(Error error) {
        this.closeConnection(error);
    }

    private void setRemoteAddress(SocketAddress remoteAddress) {
        this._remoteAddress = remoteAddress;
    }

    public void setOfferedCapabilities(List<Symbol> offeredCapabilities) {
        this._offeredCapabilities = offeredCapabilities;
    }

    private void setClosedForOutput(boolean closed) {
        this._closedForOutput = closed;
    }

    public String getLocalFQDN() {
        return this._localHostname != null ? this._localHostname : super.getLocalFQDN();
    }

    @Override
    public int getMaxFrameSize() {
        return this._maxFrameSize;
    }

    @Override
    public int getChannelMax() {
        return this._channelMax;
    }

    @Override
    public Object getReference() {
        return this._reference;
    }

    private void endpointClosed() {
        try {
            this.performDeleteTasks();
            this.closeReceived();
        }
        finally {
            NamedAddressSpace virtualHost = this.getAddressSpace();
            if (virtualHost != null) {
                virtualHost.deregisterConnection((AMQPConnection)this);
            }
        }
    }

    private void closeConnection(ErrorCondition errorCondition, String description) {
        this.closeConnection(new Error(errorCondition, description));
    }

    private void closeConnection(Error error) {
        LOGGER.debug("Closing connection {} (state={}) due to {}", new Object[]{this, this._connectionState, error});
        this._closeCause = error.getDescription();
        Close close = new Close();
        close.setError(error);
        switch (this._connectionState) {
            case AWAIT_AMQP_OR_SASL_HEADER: 
            case AWAIT_SASL_INIT: 
            case AWAIT_SASL_RESPONSE: 
            case AWAIT_AMQP_HEADER: {
                throw new ConnectionScopedRuntimeException("Connection is closed before being fully established: " + error.getDescription());
            }
            case AWAIT_OPEN: {
                this.sendOpen(0, 0);
                this.sendClose(close);
                this._connectionState = ConnectionState.CLOSED;
                this.getSender().close();
                break;
            }
            case OPENED: {
                this.sendClose(close);
                this._connectionState = ConnectionState.CLOSE_SENT;
                this.addCloseTicker();
                break;
            }
            case CLOSE_RECEIVED: {
                this.sendClose(close);
                this._connectionState = ConnectionState.CLOSED;
                this.addCloseTicker();
                break;
            }
            case CLOSE_SENT: 
            case CLOSED: {
                break;
            }
            default: {
                throw new ServerScopedRuntimeException("Unknown state: " + (Object)((Object)this._connectionState));
            }
        }
    }

    @Override
    public int sendFrame(int channel, FrameBody body, QpidByteBuffer payload) {
        if (!this._closedForOutput) {
            ValueWriter<FrameBody> writer = this._describedTypeRegistry.getValueWriter(body);
            if (payload == null) {
                this.send(new TransportFrame(channel, body));
                return 0;
            }
            int size = writer.getEncodedSize();
            int maxPayloadSize = this._maxFrameSize - (size + 9);
            long payloadLength = payload.remaining();
            if (payloadLength <= (long)maxPayloadSize) {
                this.send(new TransportFrame(channel, body, payload));
                return (int)payloadLength;
            }
            ((Transfer)body).setMore(Boolean.TRUE);
            writer = this._describedTypeRegistry.getValueWriter(body);
            size = writer.getEncodedSize();
            maxPayloadSize = this._maxFrameSize - (size + 9);
            try (QpidByteBuffer payloadDup = payload.view(0, maxPayloadSize);){
                payload.position(payload.position() + maxPayloadSize);
                this.send(new TransportFrame(channel, body, payloadDup));
            }
            return maxPayloadSize;
        }
        return -1;
    }

    @Override
    public void sendFrame(int channel, FrameBody body) {
        this.sendFrame(channel, body, null);
    }

    public ByteBufferSender getSender() {
        return this.getNetwork().getSender();
    }

    public void writerIdle() {
        this.send(TransportFrame.HEARTBEAT);
    }

    public void readerIdle() {
        AccessController.doPrivileged(() -> {
            this.getEventLogger().message(ConnectionMessages.IDLE_CLOSE((String)"", (boolean)false));
            this.getNetwork().close();
            return null;
        }, this.getAccessControllerContext());
    }

    public void encryptedTransport() {
    }

    public String getAddress() {
        return this.getNetwork().getRemoteAddress().toString();
    }

    protected void onReceive(QpidByteBuffer msg) {
        try {
            try {
                int remaining;
                do {
                    remaining = msg.remaining();
                    switch (this._connectionState) {
                        case AWAIT_AMQP_OR_SASL_HEADER: 
                        case AWAIT_AMQP_HEADER: {
                            if (remaining < 8) break;
                            this.processProtocolHeader(msg);
                            break;
                        }
                        case AWAIT_SASL_INIT: 
                        case AWAIT_SASL_RESPONSE: 
                        case AWAIT_OPEN: 
                        case OPENED: 
                        case CLOSE_SENT: {
                            this._frameHandler.parse(msg);
                            break;
                        }
                    }
                } while (msg.remaining() != remaining);
            }
            finally {
                this.receivedComplete();
            }
        }
        catch (IllegalArgumentException | IllegalStateException e) {
            throw new ConnectionScopedRuntimeException((Throwable)e);
        }
    }

    @Override
    public void receivedComplete() {
        if (this._receivingSessions != null) {
            for (Session_1_0 session : this._receivingSessions) {
                if (session == null) continue;
                AccessControlContext context = session.getAccessControllerContext();
                AccessController.doPrivileged(() -> {
                    session.receivedComplete();
                    return null;
                }, context);
            }
        }
    }

    private void processProtocolHeader(QpidByteBuffer msg) {
        if (msg.remaining() >= 8) {
            byte[] header = new byte[8];
            msg.get(header);
            AuthenticationProvider authenticationProvider = this.getPort().getAuthenticationProvider();
            if (Arrays.equals(header, SASL_HEADER)) {
                if (this._saslComplete) {
                    throw new ConnectionScopedRuntimeException("SASL Layer header received after SASL already established");
                }
                try (QpidByteBuffer protocolHeader = QpidByteBuffer.wrap((byte[])SASL_HEADER);){
                    this.getSender().send(protocolHeader);
                }
                SaslMechanisms mechanisms = new SaslMechanisms();
                ArrayList<Symbol> mechanismsList = new ArrayList<Symbol>();
                for (String name : authenticationProvider.getAvailableMechanisms(this.getTransport().isSecure())) {
                    mechanismsList.add(Symbol.valueOf(name));
                }
                mechanisms.setSaslServerMechanisms(mechanismsList.toArray(new Symbol[mechanismsList.size()]));
                this.send(new SASLFrame(mechanisms));
                this._connectionState = ConnectionState.AWAIT_SASL_INIT;
                this._frameHandler = this.getFrameHandler(true);
            } else if (Arrays.equals(header, AMQP_HEADER)) {
                if (!this._saslComplete) {
                    List mechanisms = authenticationProvider.getAvailableMechanisms(this.getTransport().isSecure());
                    if (mechanisms.contains("EXTERNAL") && this.getNetwork().getPeerPrincipal() != null) {
                        this.setUserPrincipal((Principal)new AuthenticatedPrincipal(this.getNetwork().getPeerPrincipal()));
                    } else if (mechanisms.contains("ANONYMOUS")) {
                        this.setUserPrincipal((Principal)new AuthenticatedPrincipal(((AnonymousAuthenticationManager)authenticationProvider).getAnonymousPrincipal()));
                    } else {
                        LOGGER.warn("{} : attempt to initiate AMQP connection without correctly authenticating", (Object)this.getLogSubject());
                        this._connectionState = ConnectionState.CLOSED;
                        this.getNetwork().close();
                    }
                }
                try (QpidByteBuffer protocolHeader = QpidByteBuffer.wrap((byte[])AMQP_HEADER);){
                    this.getSender().send(protocolHeader);
                }
                this._connectionState = ConnectionState.AWAIT_OPEN;
                this._frameHandler = this.getFrameHandler(false);
            } else {
                LOGGER.warn("{} : unknown AMQP header {}", (Object)this.getLogSubject(), (Object)Functions.str((byte[])header));
                this._connectionState = ConnectionState.CLOSED;
                this.getNetwork().close();
            }
        }
    }

    private FrameHandler getFrameHandler(boolean sasl) {
        return new FrameHandler(new ValueHandler(this.getDescribedTypeRegistry()), this, sasl);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void closed() {
        try {
            this.inputClosed();
        }
        catch (RuntimeException e) {
            LOGGER.error("Exception while closing", (Throwable)e);
        }
        finally {
            try {
                this.endpointClosed();
            }
            finally {
                this.markTransportClosed();
            }
        }
    }

    private void send(AMQFrame amqFrame) {
        this.updateLastWriteTime();
        FRAME_LOGGER.debug("SEND[{}|{}] : {}", new Object[]{this.getNetwork().getRemoteAddress(), amqFrame.getChannel(), amqFrame.getFrameBody() == null ? "<<HEARTBEAT>>" : amqFrame.getFrameBody()});
        int size = this._frameWriter.send(amqFrame);
        if (size > this.getMaxFrameSize()) {
            throw new OversizeFrameException(amqFrame, size);
        }
    }

    private void addCloseTicker() {
        long timeoutTime = System.currentTimeMillis() + (Long)this.getContextValue(Long.class, "connection.closeResponseTimeout");
        this.getAggregateTicker().addTicker((Ticker)new ConnectionClosingTicker(timeoutTime, this.getNetwork()));
        this.notifyWork();
    }

    public boolean isTransportBlockedForWriting() {
        return this._transportBlockedForWriting;
    }

    public void setTransportBlockedForWriting(boolean blocked) {
        if (this._transportBlockedForWriting != blocked) {
            this._transportBlockedForWriting = blocked;
            this.transportStateChanged();
        }
    }

    public Iterator<Runnable> processPendingIterator() {
        if (this.isIOThread()) {
            return new ProcessPendingIterator();
        }
        return Collections.emptyIterator();
    }

    public boolean hasWork() {
        return this._stateChanged.get();
    }

    public void notifyWork() {
        this._stateChanged.set(true);
        Action<ProtocolEngine> listener = this._workListener.get();
        if (listener != null) {
            listener.performAction((Object)this);
        }
    }

    public void notifyWork(AMQPSession<?, ?> sessionModel) {
        this._sessionsWithWork.add(sessionModel);
        this.notifyWork();
    }

    public void clearWork() {
        this._stateChanged.set(false);
    }

    public void setWorkListener(Action<ProtocolEngine> listener) {
        this._workListener.set(listener);
    }

    public boolean hasSessionWithName(byte[] name) {
        return false;
    }

    public void sendConnectionCloseAsync(AMQPConnection.CloseReason reason, String description) {
        ErrorCondition cause;
        this.stopConnection();
        switch (reason) {
            case MANAGEMENT: {
                cause = ConnectionError.CONNECTION_FORCED;
                break;
            }
            case TRANSACTION_TIMEOUT: {
                cause = AmqpError.RESOURCE_LIMIT_EXCEEDED;
                break;
            }
            default: {
                cause = AmqpError.INTERNAL_ERROR;
            }
        }
        Action action = object -> this.closeConnection(cause, description);
        this.addAsyncTask((Action<? super ConnectionHandler>)action);
    }

    public void closeSessionAsync(final AMQPSession<?, ?> session, AMQPConnection.CloseReason reason, final String message) {
        ErrorCondition cause;
        switch (reason) {
            case MANAGEMENT: {
                cause = ConnectionError.CONNECTION_FORCED;
                break;
            }
            case TRANSACTION_TIMEOUT: {
                cause = AmqpError.RESOURCE_LIMIT_EXCEEDED;
                break;
            }
            default: {
                cause = AmqpError.INTERNAL_ERROR;
            }
        }
        this.addAsyncTask((Action<? super ConnectionHandler>)((Action)object -> AccessController.doPrivileged(new PrivilegedAction<Void>(){

            @Override
            public Void run() {
                ((Session_1_0)session).close(cause, message);
                return null;
            }
        }, ((Session_1_0)session).getAccessControllerContext())));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void block() {
        Object object = this._blockingLock;
        synchronized (object) {
            if (!this._blocking) {
                this._blocking = true;
                this.doOnIOThreadAsync(this::doBlock);
            }
        }
    }

    private void doBlock() {
        for (Session_1_0 session : this._sessions) {
            session.block();
        }
    }

    public String getRemoteContainerName() {
        return this._remoteContainerId;
    }

    public Collection<? extends Session_1_0> getSessionModels() {
        return Collections.unmodifiableCollection(this._sessions);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unblock() {
        Object object = this._blockingLock;
        synchronized (object) {
            if (this._blocking) {
                this._blocking = false;
                this.doOnIOThreadAsync(this::doUnblock);
            }
        }
    }

    private void doUnblock() {
        for (Session_1_0 session : this._sessions) {
            session.unblock();
        }
    }

    public int getSessionCountLimit() {
        return this._channelMax + 1;
    }

    public boolean isOrderlyClose() {
        return this._orderlyClose.get();
    }

    protected String getCloseCause() {
        return this._closeCause;
    }

    @Override
    public boolean getSendSaslFinalChallengeAsChallenge() {
        return this._sendSaslFinalChallengeAsChallenge;
    }

    protected void addAsyncTask(Action<? super ConnectionHandler> action) {
        this._asyncTaskList.add(action);
        this.notifyWork();
    }

    private void sendOpen(int channelMax, int maxFrameSize) {
        Open open = new Open();
        Map props = Collections.emptyMap();
        for (ConnectionPropertyEnricher connectionPropertyEnricher : this.getPort().getConnectionPropertyEnrichers()) {
            props = connectionPropertyEnricher.addConnectionProperties((AMQPConnection)this, props);
        }
        for (Map.Entry entry : props.entrySet()) {
            this._properties.put(Symbol.valueOf((String)entry.getKey()), entry.getValue());
        }
        if (this._receivingSessions == null) {
            this._receivingSessions = new Session_1_0[channelMax + 1];
            this._sendingSessions = new Session_1_0[channelMax + 1];
        }
        if (channelMax < this._channelMax) {
            this._channelMax = channelMax;
        }
        open.setChannelMax(UnsignedShort.valueOf((short)channelMax));
        open.setContainerId(this.getAddressSpace() == null ? UUID.randomUUID().toString() : this.getAddressSpace().getId().toString());
        open.setMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize));
        open.setIdleTimeOut(UnsignedInteger.valueOf(this._incomingIdleTimeout));
        if (this._offeredCapabilities != null && !this._offeredCapabilities.isEmpty()) {
            open.setOfferedCapabilities(this._offeredCapabilities.toArray(new Symbol[this._offeredCapabilities.size()]));
        }
        if (this._remoteDesiredCapabilities != null && this._remoteDesiredCapabilities.contains(SoleConnectionConnectionProperties.SOLE_CONNECTION_FOR_CONTAINER)) {
            this._properties.put(SoleConnectionConnectionProperties.SOLE_CONNECTION_DETECTION_POLICY, SoleConnectionDetectionPolicy.STRONG);
        }
        if (this._soleConnectionEnforcementPolicy == SoleConnectionEnforcementPolicy.CLOSE_EXISTING) {
            this._properties.put(SoleConnectionConnectionProperties.SOLE_CONNECTION_ENFORCEMENT_POLICY, SoleConnectionEnforcementPolicy.CLOSE_EXISTING.getValue());
        }
        open.setProperties(this._properties);
        this.sendFrame(0, open);
    }

    private Session_1_0 getSession(int channel) {
        Session_1_0 session = this._receivingSessions[channel];
        if (session == null) {
            Error error = new Error();
            error.setCondition(ConnectionError.FRAMING_ERROR);
            error.setDescription("Frame received on channel " + channel + " which is not known as a begun session.");
            this.handleError(error);
        }
        return session;
    }

    private void sendClose(Close closeToSend) {
        this.sendFrame(0, closeToSend);
        this.closeSender();
    }

    private void assertState(ConnectionState state) {
        if (this._connectionState != state) {
            throw new ConnectionScopedRuntimeException(String.format("Unexpected state, client has sent frame in an illegal order.  Required state: %s, actual state: %s", new Object[]{state, this._connectionState}));
        }
    }

    public Iterator<ServerTransaction> getOpenTransactions() {
        return new Iterator<ServerTransaction>(){
            int _index = 0;

            @Override
            public boolean hasNext() {
                for (int i = this._index; i < AMQPConnection_1_0Impl.this._openTransactions.length; ++i) {
                    if (AMQPConnection_1_0Impl.this._openTransactions[i] == null) continue;
                    return true;
                }
                return false;
            }

            @Override
            public ServerTransaction next() {
                while (this._index < AMQPConnection_1_0Impl.this._openTransactions.length) {
                    if (AMQPConnection_1_0Impl.this._openTransactions[this._index] != null) {
                        IdentifiedTransaction txn = new IdentifiedTransaction(this._index, AMQPConnection_1_0Impl.this._openTransactions[this._index]);
                        ++this._index;
                        return txn.getServerTransaction();
                    }
                    ++this._index;
                }
                throw new NoSuchElementException();
            }

            @Override
            public void remove() {
                ((AMQPConnection_1_0Impl)AMQPConnection_1_0Impl.this)._openTransactions[this._index] = null;
            }
        };
    }

    @Override
    public IdentifiedTransaction createIdentifiedTransaction() {
        int id;
        ServerTransaction[] openTransactions = this._openTransactions;
        int maxOpenTransactions = openTransactions.length;
        for (id = 0; id < maxOpenTransactions && openTransactions[id] != null; ++id) {
        }
        if (id == maxOpenTransactions) {
            int newSize = maxOpenTransactions < 1024 ? 2 * maxOpenTransactions : maxOpenTransactions + 1024;
            this._openTransactions = new ServerTransaction[newSize];
            System.arraycopy(openTransactions, 0, this._openTransactions, 0, maxOpenTransactions);
        }
        LocalTransaction serverTransaction = this.createLocalTransaction();
        this._openTransactions[id] = serverTransaction;
        return new IdentifiedTransaction(id, (ServerTransaction)serverTransaction);
    }

    @Override
    public ServerTransaction getTransaction(int txnId) {
        try {
            return this._openTransactions[txnId];
        }
        catch (ArrayIndexOutOfBoundsException e) {
            throw new UnknownTransactionException(txnId);
        }
    }

    @Override
    public void removeTransaction(int txnId) {
        try {
            this._openTransactions[txnId] = null;
        }
        catch (ArrayIndexOutOfBoundsException e) {
            throw new UnknownTransactionException(txnId);
        }
    }

    protected boolean isOpeningInProgress() {
        switch (this._connectionState) {
            case AWAIT_AMQP_OR_SASL_HEADER: 
            case AWAIT_SASL_INIT: 
            case AWAIT_SASL_RESPONSE: 
            case AWAIT_AMQP_HEADER: 
            case AWAIT_OPEN: {
                return true;
            }
            case OPENED: 
            case CLOSE_SENT: 
            case CLOSE_RECEIVED: 
            case CLOSED: {
                return false;
            }
        }
        throw new IllegalStateException("Unsupported state " + (Object)((Object)this._connectionState));
    }

    private class ProcessPendingIterator
    implements Iterator<Runnable> {
        private Iterator<? extends AMQPSession<?, ?>> _sessionIterator;

        private ProcessPendingIterator() {
            this._sessionIterator = AMQPConnection_1_0Impl.this._sessionsWithWork.iterator();
        }

        @Override
        public boolean hasNext() {
            return !AMQPConnection_1_0Impl.this._sessionsWithWork.isEmpty() && !AMQPConnection_1_0Impl.this.isClosed() && !AMQPConnection_1_0Impl.this.isConnectionStopped() || !AMQPConnection_1_0Impl.this._asyncTaskList.isEmpty();
        }

        @Override
        public Runnable next() {
            if (!AMQPConnection_1_0Impl.this._sessionsWithWork.isEmpty()) {
                if (AMQPConnection_1_0Impl.this.isClosed() || AMQPConnection_1_0Impl.this.isConnectionStopped()) {
                    Action asyncAction = (Action)AMQPConnection_1_0Impl.this._asyncTaskList.poll();
                    if (asyncAction != null) {
                        return () -> asyncAction.performAction((Object)AMQPConnection_1_0Impl.this);
                    }
                    return () -> {};
                }
                if (!this._sessionIterator.hasNext()) {
                    this._sessionIterator = AMQPConnection_1_0Impl.this._sessionsWithWork.iterator();
                }
                AMQPSession<?, ?> session = this._sessionIterator.next();
                return () -> {
                    this._sessionIterator.remove();
                    if (session.processPending()) {
                        AMQPConnection_1_0Impl.this._sessionsWithWork.add(session);
                    }
                };
            }
            if (!AMQPConnection_1_0Impl.this._asyncTaskList.isEmpty()) {
                Action asyncAction = (Action)AMQPConnection_1_0Impl.this._asyncTaskList.poll();
                return () -> asyncAction.performAction((Object)AMQPConnection_1_0Impl.this);
            }
            throw new NoSuchElementException();
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }
    }
}

