/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.server.protocol.v0_8;

import com.google.common.base.Function;
import com.google.common.collect.Collections2;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.security.AccessControlContext;
import java.security.AccessControlException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.security.auth.Subject;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.consumer.ConsumerOption;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.filter.AMQPFilterTypes;
import org.apache.qpid.server.filter.ArrivalTimeFilter;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.logging.EventLoggerProvider;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.RejectType;
import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectTypeRegistry;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Model;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.NoFactoryForTypeException;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.ProtocolVersion;
import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8;
import org.apache.qpid.server.protocol.v0_8.AMQShortString;
import org.apache.qpid.server.protocol.v0_8.CachedFrame;
import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod;
import org.apache.qpid.server.protocol.v0_8.ConsumerTagInUseException;
import org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8;
import org.apache.qpid.server.protocol.v0_8.CreditRestorer;
import org.apache.qpid.server.protocol.v0_8.FieldTable;
import org.apache.qpid.server.protocol.v0_8.IncomingMessage;
import org.apache.qpid.server.protocol.v0_8.InfiniteCreditCreditManager;
import org.apache.qpid.server.protocol.v0_8.MessageConsumerAssociation;
import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
import org.apache.qpid.server.protocol.v0_8.Pre0_10CreditManager;
import org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.v0_8.UnacknowledgedMessageMap;
import org.apache.qpid.server.protocol.v0_8.UnacknowledgedMessageMapImpl;
import org.apache.qpid.server.protocol.v0_8.transport.AMQFrame;
import org.apache.qpid.server.protocol.v0_8.transport.AccessRequestOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.BasicAckBody;
import org.apache.qpid.server.protocol.v0_8.transport.BasicCancelOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.BasicConsumeOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
import org.apache.qpid.server.protocol.v0_8.transport.BasicGetEmptyBody;
import org.apache.qpid.server.protocol.v0_8.transport.BasicNackBody;
import org.apache.qpid.server.protocol.v0_8.transport.BasicQosOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.BasicRecoverSyncOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ChannelFlowBody;
import org.apache.qpid.server.protocol.v0_8.transport.ChannelFlowOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConfirmSelectOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ContentBody;
import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
import org.apache.qpid.server.protocol.v0_8.transport.ExchangeBoundOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ExchangeDeclareOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ExchangeDeleteOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.MessagePublishInfo;
import org.apache.qpid.server.protocol.v0_8.transport.MethodRegistry;
import org.apache.qpid.server.protocol.v0_8.transport.QueueBindOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.QueueDeclareOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.QueueDeleteOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.QueuePurgeOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.QueueUnbindOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ServerChannelMethodProcessor;
import org.apache.qpid.server.protocol.v0_8.transport.TxCommitOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.TxRollbackOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.TxSelectOkBody;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.session.AMQPSession;
import org.apache.qpid.server.session.AbstractAMQPSession;
import org.apache.qpid.server.store.MessageHandle;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.transport.util.Functions;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.AsyncCommand;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.Deletable;
import org.apache.qpid.server.virtualhost.MessageDestinationIsAlternateException;
import org.apache.qpid.server.virtualhost.RequiredExchangeException;
import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
import org.apache.qpid.server.virtualhost.UnknownAlternateBindingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQChannel
extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0_8>
implements AsyncAutoCommitTransaction.FutureRecorder,
ServerChannelMethodProcessor,
EventLoggerProvider,
CreditRestorer,
Deletable<AMQChannel> {
    public static final int DEFAULT_PREFETCH = 4096;
    private static final Logger LOGGER = LoggerFactory.getLogger(AMQChannel.class);
    private static final InfiniteCreditCreditManager INFINITE_CREDIT_CREDIT_MANAGER = new InfiniteCreditCreditManager();
    private static final Function<MessageConsumerAssociation, MessageInstance> MESSAGE_INSTANCE_FUNCTION = new Function<MessageConsumerAssociation, MessageInstance>(){

        public MessageInstance apply(MessageConsumerAssociation input) {
            return input.getMessageInstance();
        }
    };
    private static final String ALTERNATE_EXCHANGE = "alternateExchange";
    private final DefaultQueueAssociationClearingTask _defaultQueueAssociationClearingTask = new DefaultQueueAssociationClearingTask();
    private final int _channelId;
    private final Pre0_10CreditManager _creditManager;
    private final boolean _forceMessageValidation;
    private long _deliveryTag = 0L;
    private volatile Queue<?> _defaultQueue;
    private int _consumerTag;
    private IncomingMessage _currentMessage;
    private final Map<AMQShortString, ConsumerTarget_0_8> _tag2SubscriptionTargetMap = new HashMap<AMQShortString, ConsumerTarget_0_8>();
    private final MessageStore _messageStore;
    private final java.util.Queue<AsyncCommand> _unfinishedCommandsQueue = new ConcurrentLinkedQueue<AsyncCommand>();
    private final UnacknowledgedMessageMap _unacknowledgedMessageMap;
    private final AtomicBoolean _suspended = new AtomicBoolean(false);
    private volatile ServerTransaction _transaction;
    private final AMQPConnection_0_8 _connection;
    private final AtomicBoolean _closing = new AtomicBoolean(false);
    private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet());
    private final AtomicBoolean _blocking = new AtomicBoolean(false);
    private volatile boolean _rollingBack;
    private List<MessageConsumerAssociation> _resendList = new ArrayList<MessageConsumerAssociation>();
    private static final AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = AMQShortString.createAMQShortString("Immediate delivery is not possible.");
    private final ClientDeliveryMethod _clientDeliveryMethod;
    private final ImmediateAction _immediateAction = new ImmediateAction();
    private long _blockTime;
    private long _blockingTimeout;
    private boolean _confirmOnPublish;
    private long _confirmedMessageCounter;
    private boolean _wireBlockingState;
    private boolean _prefetchLoggedForChannel = false;
    private boolean _logChannelFlowMessages = true;
    private final CachedFrame _txCommitOkFrame;
    private boolean _channelFlow = true;
    private final String id = "(" + System.identityHashCode(this) + ")";

    public AMQChannel(AMQPConnection_0_8 connection, int channelId, MessageStore messageStore) {
        super((Connection)connection, channelId);
        this._creditManager = new Pre0_10CreditManager(0L, 0L, (Long)connection.getContextValue(Long.class, "connection.high_prefetch_limit"), (Long)connection.getContextValue(Long.class, "connection.batch_limit"));
        this._unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(4096, this);
        this._connection = connection;
        this._channelId = channelId;
        this._messageStore = messageStore;
        this._blockingTimeout = (Long)connection.getBroker().getContextValue(Long.class, "channel.flowControlEnforcementTimeout");
        this._transaction = new AsyncAutoCommitTransaction(this._messageStore, (AsyncAutoCommitTransaction.FutureRecorder)this);
        MethodRegistry methodRegistry = this._connection.getMethodRegistry();
        TxCommitOkBody responseBody = methodRegistry.createTxCommitOkBody();
        this._txCommitOkFrame = new CachedFrame(responseBody.generateFrame(this._channelId));
        this._clientDeliveryMethod = connection.createDeliveryMethod(this._channelId);
        AccessController.doPrivileged(new PrivilegedAction<Object>(){

            @Override
            public Object run() {
                AMQChannel.this.message(ChannelMessages.CREATE());
                return null;
            }
        }, this._accessControllerContext);
        this._forceMessageValidation = (Boolean)connection.getContextValue(Boolean.class, "qpid.connection.forceValidation");
    }

    private void message(LogMessage message) {
        this.getEventLogger().message(message);
    }

    public AccessControlContext getAccessControllerContext() {
        return this._accessControllerContext;
    }

    private boolean performGet(MessageSource queue, boolean acks) throws MessageSource.ExistingConsumerPreventsExclusive, MessageSource.ExistingExclusiveConsumer, MessageSource.ConsumerAccessRefused, MessageSource.QueueDeleted {
        boolean canCallSendNextMessageAgain;
        GetDeliveryMethod getDeliveryMethod = new GetDeliveryMethod(queue);
        EnumSet<ConsumerOption> options = EnumSet.of(ConsumerOption.TRANSIENT, ConsumerOption.ACQUIRES, ConsumerOption.SEES_REQUEUES);
        ConsumerTarget_0_8 target = acks ? ConsumerTarget_0_8.createGetAckTarget(this, AMQShortString.EMPTY_STRING, null, INFINITE_CREDIT_CREDIT_MANAGER, getDeliveryMethod) : ConsumerTarget_0_8.createGetNoAckTarget(this, AMQShortString.EMPTY_STRING, null, INFINITE_CREDIT_CREDIT_MANAGER, getDeliveryMethod);
        queue.addConsumer((ConsumerTarget)target, null, AMQMessage.class, "", options, null);
        target.updateNotifyWorkDesired();
        while ((canCallSendNextMessageAgain = target.sendNextMessage()) && !getDeliveryMethod.hasDeliveredMessage()) {
        }
        target.close();
        return getDeliveryMethod.hasDeliveredMessage();
    }

    boolean isTransactional() {
        return this._transaction.isTransactional();
    }

    ServerTransaction getTransaction() {
        return this._transaction;
    }

    public void receivedComplete() {
        AccessController.doPrivileged(new PrivilegedAction<Void>(){

            @Override
            public Void run() {
                AMQChannel.this.sync();
                return null;
            }
        }, this.getAccessControllerContext());
    }

    private void setPublishFrame(MessagePublishInfo info, MessageDestination e) {
        this._currentMessage = new IncomingMessage(info);
        this._currentMessage.setMessageDestination(e);
    }

    private void publishContentHeader(ContentHeaderBody contentHeaderBody) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Content header received on channel " + this._channelId);
        }
        this._currentMessage.setContentHeaderBody(contentHeaderBody);
        this.deliverCurrentMessageIfComplete();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void deliverCurrentMessageIfComplete() {
        if (this._currentMessage.allContentReceived()) {
            MessagePublishInfo info = this._currentMessage.getMessagePublishInfo();
            String routingKey = AMQShortString.toString(info.getRoutingKey());
            String exchangeName = AMQShortString.toString(info.getExchange());
            try {
                MessageDestination destination = this._currentMessage.getDestination();
                ContentHeaderBody contentHeader = this._currentMessage.getContentHeader();
                this._connection.checkAuthorizedMessagePrincipal(AMQShortString.toString(contentHeader.getProperties().getUserId()));
                this._publishAuthCache.authorisePublish(destination, routingKey, info.isImmediate(), this._connection.getLastReadTime());
                if (this._confirmOnPublish) {
                    ++this._confirmedMessageCounter;
                }
                long bodySize = this._currentMessage.getSize();
                try {
                    MessageMetaData messageMetaData = new MessageMetaData(info, contentHeader, this.getConnection().getLastReadTime());
                    MessageHandle handle = this._messageStore.addMessage((StorableMessageMetaData)messageMetaData);
                    int bodyCount = this._currentMessage.getBodyCount();
                    if (bodyCount > 0) {
                        for (int i = 0; i < bodyCount; ++i) {
                            ContentBody contentChunk = this._currentMessage.getContentChunk(i);
                            handle.addContent(contentChunk.getPayload());
                            contentChunk.dispose();
                        }
                    }
                    StoredMessage storedMessage = handle.allContentAdded();
                    final AMQMessage amqMessage = new AMQMessage((StoredMessage<MessageMetaData>)storedMessage, this._connection.getReference());
                    try (MessageReference reference = amqMessage.newReference();){
                        this._currentMessage = null;
                        InstanceProperties instanceProperties = new InstanceProperties(){

                            public Object getProperty(InstanceProperties.Property prop) {
                                switch (prop) {
                                    case EXPIRATION: {
                                        return amqMessage.getExpiration();
                                    }
                                    case IMMEDIATE: {
                                        return amqMessage.isImmediate();
                                    }
                                    case PERSISTENT: {
                                        return amqMessage.isPersistent();
                                    }
                                    case MANDATORY: {
                                        return amqMessage.isMandatory();
                                    }
                                    case REDELIVERED: {
                                        return false;
                                    }
                                }
                                return null;
                            }
                        };
                        RoutingResult result = destination.route((ServerMessage)amqMessage, amqMessage.getInitialRoutingAddress(), instanceProperties);
                        int enqueues = result.send(this._transaction, (Action)(amqMessage.isImmediate() ? this._immediateAction : null));
                        if (enqueues == 0) {
                            boolean mandatory = amqMessage.isMandatory();
                            boolean closeOnNoRoute = this._connection.isCloseWhenNoRoute();
                            if (LOGGER.isDebugEnabled()) {
                                LOGGER.debug("Unroutable message exchange='{}', routing key='{}', mandatory={}, transactionalSession={}, closeOnNoRoute={}, confirmOnPublish={}", new Object[]{exchangeName, routingKey, mandatory, this.isTransactional(), closeOnNoRoute, this._confirmOnPublish});
                            }
                            int errorCode = 312;
                            String errorMessage = String.format("No route for message with exchange '%s' and routing key '%s'", exchangeName, routingKey);
                            if (result.containsReject(new RejectType[]{RejectType.LIMIT_EXCEEDED})) {
                                errorCode = 506;
                                errorMessage = errorMessage + ":" + result.getRejectReason();
                            }
                            if (mandatory && this.isTransactional() && !this._confirmOnPublish && this._connection.isCloseWhenNoRoute()) {
                                this._connection.sendConnectionClose(errorCode, errorMessage, this._channelId);
                            } else if (mandatory || amqMessage.isImmediate()) {
                                if (this._confirmOnPublish) {
                                    this._connection.writeFrame(new AMQFrame(this._channelId, new BasicNackBody(this._confirmedMessageCounter, false, false)));
                                }
                                this._transaction.addPostTransactionAction((ServerTransaction.Action)new WriteReturnAction(errorCode, errorMessage, amqMessage));
                            } else {
                                if (this._confirmOnPublish) {
                                    this._connection.writeFrame(new AMQFrame(this._channelId, new BasicAckBody(this._confirmedMessageCounter, false)));
                                }
                                this.message(ExchangeMessages.DISCARDMSG((String)exchangeName, (String)routingKey));
                            }
                        } else if (this._confirmOnPublish) {
                            this.recordFuture((ListenableFuture<Void>)Futures.immediateFuture(null), new ServerTransaction.Action(){
                                private final long _deliveryTag;
                                {
                                    this._deliveryTag = AMQChannel.this._confirmedMessageCounter;
                                }

                                public void postCommit() {
                                    BasicAckBody body = AMQChannel.this._connection.getMethodRegistry().createBasicAckBody(this._deliveryTag, false);
                                    AMQChannel.this._connection.writeFrame(body.generateFrame(AMQChannel.this._channelId));
                                }

                                public void onRollback() {
                                    BasicNackBody body = new BasicNackBody(this._deliveryTag, false, false);
                                    AMQChannel.this._connection.writeFrame(new AMQFrame(AMQChannel.this._channelId, body));
                                }
                            });
                        }
                    }
                }
                finally {
                    this.registerMessageReceived(bodySize);
                    if (this.isTransactional()) {
                        this.registerTransactedMessageReceived();
                    }
                    this._currentMessage = null;
                }
            }
            catch (AccessControlException e) {
                this._connection.sendConnectionClose(403, e.getMessage(), this.getChannelId());
            }
        }
    }

    private void publishContentBody(ContentBody contentBody) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(this.debugIdentity() + " content body received on channel " + this._channelId);
        }
        try {
            long currentSize = this._currentMessage.addContentBodyFrame(contentBody);
            if (currentSize > this._currentMessage.getSize()) {
                this._connection.sendConnectionClose(501, "More message data received than content header defined", this._channelId);
            } else {
                this.deliverCurrentMessageIfComplete();
            }
        }
        catch (RuntimeException e) {
            this._currentMessage = null;
            throw e;
        }
    }

    public long getNextDeliveryTag() {
        return ++this._deliveryTag;
    }

    private int getNextConsumerTag() {
        return ++this._consumerTag;
    }

    private AMQShortString consumeFromSource(AMQShortString tag, Collection<MessageSource> sources, boolean acks, FieldTable arguments, boolean exclusive, boolean noLocal) throws MessageSource.ExistingConsumerPreventsExclusive, MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException, MessageSource.ConsumerAccessRefused, ConsumerTagInUseException, MessageSource.QueueDeleted {
        ConsumerTarget_0_8 target;
        boolean multiQueue;
        if (tag == null) {
            tag = AMQShortString.createAMQShortString("sgen_" + this.getNextConsumerTag());
        }
        if (this._tag2SubscriptionTargetMap.containsKey(tag)) {
            throw new ConsumerTagInUseException("Consumer already exists with same tag: " + tag);
        }
        EnumSet<ConsumerOption> options = EnumSet.noneOf(ConsumerOption.class);
        boolean bl = multiQueue = sources.size() > 1;
        if (arguments != null && Boolean.TRUE.equals(arguments.get(AMQPFilterTypes.NO_CONSUME.getValue()))) {
            target = ConsumerTarget_0_8.createBrowserTarget(this, tag, arguments, INFINITE_CREDIT_CREDIT_MANAGER, multiQueue);
        } else if (acks) {
            target = ConsumerTarget_0_8.createAckTarget(this, tag, arguments, this._creditManager, multiQueue);
            options.add(ConsumerOption.ACQUIRES);
            options.add(ConsumerOption.SEES_REQUEUES);
        } else {
            target = ConsumerTarget_0_8.createNoAckTarget(this, tag, arguments, INFINITE_CREDIT_CREDIT_MANAGER, multiQueue);
            options.add(ConsumerOption.ACQUIRES);
            options.add(ConsumerOption.SEES_REQUEUES);
        }
        if (exclusive) {
            options.add(ConsumerOption.EXCLUSIVE);
        }
        this._tag2SubscriptionTargetMap.put(tag, target);
        try {
            FilterManager filterManager = FilterManagerFactory.createManager(FieldTable.convertToMap(arguments));
            if (noLocal) {
                if (filterManager == null) {
                    filterManager = new FilterManager();
                }
                NoLocalFilter filter = new NoLocalFilter();
                filterManager.add(filter.getName(), (MessageFilter)filter);
            }
            if (arguments != null && arguments.containsKey(AMQPFilterTypes.REPLAY_PERIOD.toString())) {
                long period;
                Object value = arguments.get(AMQPFilterTypes.REPLAY_PERIOD.toString());
                if (value instanceof Number) {
                    period = ((Number)value).longValue();
                } else if (value instanceof String) {
                    try {
                        period = Long.parseLong(value.toString());
                    }
                    catch (NumberFormatException e) {
                        throw new AMQInvalidArgumentException("Cannot parse value " + value + " as a number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString());
                    }
                } else {
                    throw new AMQInvalidArgumentException("Cannot parse value " + value + " as a number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString());
                }
                long startingFrom = System.currentTimeMillis() - 1000L * period;
                if (filterManager == null) {
                    filterManager = new FilterManager();
                }
                ArrivalTimeFilter filter = new ArrivalTimeFilter(startingFrom, period == 0L);
                filterManager.add(filter.getName(), (MessageFilter)filter);
            }
            Integer priority = null;
            if (arguments != null && arguments.containsKey("x-priority")) {
                Object value = arguments.get("x-priority");
                if (value instanceof Number) {
                    priority = ((Number)value).intValue();
                } else if (value instanceof String || value instanceof AMQShortString) {
                    try {
                        priority = Integer.parseInt(value.toString());
                    }
                    catch (NumberFormatException numberFormatException) {
                        // empty catch block
                    }
                }
            }
            for (MessageSource source : sources) {
                source.addConsumer((ConsumerTarget)target, filterManager, AMQMessage.class, AMQShortString.toString(tag), options, priority);
            }
            target.updateNotifyWorkDesired();
        }
        catch (AccessControlException | AMQInvalidArgumentException | MessageSource.ConsumerAccessRefused | MessageSource.ExistingConsumerPreventsExclusive | MessageSource.ExistingExclusiveConsumer | MessageSource.QueueDeleted e) {
            this._tag2SubscriptionTargetMap.remove(tag);
            throw e;
        }
        return tag;
    }

    private boolean unsubscribeConsumer(AMQShortString consumerTag) {
        ConsumerTarget_0_8 target;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Unsubscribing consumer '{}' on channel {}", (Object)consumerTag, (Object)this);
        }
        if ((target = this._tag2SubscriptionTargetMap.remove(consumerTag)) != null) {
            target.close();
            return true;
        }
        LOGGER.warn("Attempt to unsubscribe consumer with tag '" + consumerTag + "' which is not registered.");
        return false;
    }

    public void close() {
        this.close(0, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close(int cause, String message) {
        LogMessage logMessage;
        if (!this._closing.compareAndSet(false, true)) {
            return;
        }
        try {
            this.unsubscribeAllConsumers();
            this.setDefaultQueue(null);
            for (Action task : this._taskList) {
                task.performAction((Object)this);
            }
            if (this._transaction instanceof LocalTransaction) {
                if (((LocalTransaction)this._transaction).hasOutstandingWork()) {
                    this._connection.incrementTransactionRollbackCounter();
                }
                this._connection.decrementTransactionOpenCounter();
                this._connection.unregisterTransactionTickers(this._transaction);
            }
            this._transaction.rollback();
            this.requeue();
            this.dispose();
            logMessage = cause == 0 ? ChannelMessages.CLOSE() : ChannelMessages.CLOSE_FORCED((Number)cause, (String)message);
        }
        catch (Throwable throwable) {
            this.dispose();
            LogMessage operationalLogMessage = cause == 0 ? ChannelMessages.CLOSE() : ChannelMessages.CLOSE_FORCED((Number)cause, (String)message);
            this.messageWithSubject(operationalLogMessage);
            throw throwable;
        }
        LogMessage operationalLogMessage = logMessage;
        this.messageWithSubject(operationalLogMessage);
    }

    private void messageWithSubject(LogMessage operationalLogMessage) {
        this.getEventLogger().message(this._logSubject, operationalLogMessage);
    }

    private void unsubscribeAllConsumers() {
        if (LOGGER.isDebugEnabled()) {
            if (!this._tag2SubscriptionTargetMap.isEmpty()) {
                LOGGER.debug("Unsubscribing all consumers on channel " + this.toString());
            } else {
                LOGGER.debug("No consumers to unsubscribe on channel " + this.toString());
            }
        }
        HashSet<AMQShortString> subscriptionTags = new HashSet<AMQShortString>(this._tag2SubscriptionTargetMap.keySet());
        for (AMQShortString tag : subscriptionTags) {
            this.unsubscribeConsumer(tag);
        }
    }

    public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, MessageInstanceConsumer consumer, boolean usesCredit) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(this.debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag + ") for " + consumer + " on " + entry.getOwningResource().getName());
        }
        this._unacknowledgedMessageMap.add(deliveryTag, entry, consumer, usesCredit);
    }

    private String debugIdentity() {
        return this._channelId + this.id;
    }

    private void requeue() {
        final LinkedHashMap copy = new LinkedHashMap();
        this._unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor(){

            @Override
            public boolean callback(long deliveryTag, MessageConsumerAssociation messageConsumerPair) {
                copy.put(deliveryTag, messageConsumerPair);
                return false;
            }

            @Override
            public void visitComplete() {
            }
        });
        if (!copy.isEmpty() && LOGGER.isDebugEnabled()) {
            LOGGER.debug("Requeuing {} unacked messages", (Object)copy.size());
        }
        for (Map.Entry entry : copy.entrySet()) {
            MessageInstance unacked = ((MessageConsumerAssociation)entry.getValue()).getMessageInstance();
            MessageInstanceConsumer consumer = ((MessageConsumerAssociation)entry.getValue()).getConsumer();
            unacked.setRedelivered();
            this._unacknowledgedMessageMap.remove((Long)entry.getKey(), true);
            unacked.release(consumer);
        }
    }

    private void requeue(long deliveryTag) {
        MessageConsumerAssociation association = this._unacknowledgedMessageMap.remove(deliveryTag, true);
        if (association != null) {
            MessageInstance unacked = association.getMessageInstance();
            unacked.setRedelivered();
            unacked.release(association.getConsumer());
        } else {
            LOGGER.warn("Requested requeue of message: {} but no such delivery tag exists.", (Object)deliveryTag);
        }
    }

    private boolean isMaxDeliveryCountEnabled(long deliveryTag) {
        MessageInstance queueEntry = this._unacknowledgedMessageMap.get(deliveryTag);
        if (queueEntry != null) {
            int maximumDeliveryCount = queueEntry.getMaximumDeliveryCount();
            return maximumDeliveryCount > 0;
        }
        return false;
    }

    private boolean isDeliveredTooManyTimes(long deliveryTag) {
        MessageInstance queueEntry = this._unacknowledgedMessageMap.get(deliveryTag);
        if (queueEntry != null) {
            int maximumDeliveryCount = queueEntry.getMaximumDeliveryCount();
            int numDeliveries = queueEntry.getDeliveryCount();
            return maximumDeliveryCount != 0 && numDeliveries >= maximumDeliveryCount;
        }
        return false;
    }

    private void resend() {
        MessageInstanceConsumer consumer;
        MessageInstance message;
        long deliveryTag;
        final LinkedHashMap msgToRequeue = new LinkedHashMap();
        final LinkedHashMap msgToResend = new LinkedHashMap();
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Unacknowledged messages: {}", (Object)this._unacknowledgedMessageMap.size());
        }
        this._unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor(){

            @Override
            public boolean callback(long deliveryTag, MessageConsumerAssociation association) {
                if (association.getConsumer().isClosed()) {
                    msgToRequeue.put(deliveryTag, association);
                } else {
                    msgToResend.put(deliveryTag, association);
                }
                return false;
            }

            @Override
            public void visitComplete() {
            }
        });
        for (Map.Entry entry : msgToResend.entrySet()) {
            deliveryTag = (Long)entry.getKey();
            message = ((MessageConsumerAssociation)entry.getValue()).getMessageInstance();
            consumer = ((MessageConsumerAssociation)entry.getValue()).getConsumer();
            message.setRedelivered();
            if (message.makeAcquisitionUnstealable(consumer)) {
                message.decrementDeliveryCount();
                consumer.getTarget().send(consumer, message, false);
                this._unacknowledgedMessageMap.remove(deliveryTag, false);
                continue;
            }
            msgToRequeue.put(deliveryTag, entry.getValue());
        }
        for (Map.Entry entry : msgToRequeue.entrySet()) {
            deliveryTag = (Long)entry.getKey();
            message = ((MessageConsumerAssociation)entry.getValue()).getMessageInstance();
            consumer = ((MessageConsumerAssociation)entry.getValue()).getConsumer();
            message.decrementDeliveryCount();
            this._unacknowledgedMessageMap.remove(deliveryTag, true);
            message.setRedelivered();
            message.release(consumer);
        }
    }

    private UnacknowledgedMessageMap getUnacknowledgedMessageMap() {
        return this._unacknowledgedMessageMap;
    }

    private void setSuspended(boolean suspended) {
        boolean wasSuspended = this._suspended.getAndSet(suspended);
        if (wasSuspended != suspended) {
            if (!suspended && this._logChannelFlowMessages) {
                this.messageWithSubject(ChannelMessages.FLOW((String)"Started"));
            }
            if (wasSuspended) {
                for (ConsumerTarget_0_8 s : this.getConsumerTargets()) {
                    for (MessageInstanceConsumer sub : s.getConsumers()) {
                        sub.externalStateChange();
                    }
                }
            }
            if (suspended && this._logChannelFlowMessages) {
                this.messageWithSubject(ChannelMessages.FLOW((String)"Stopped"));
            }
        }
    }

    private void commit(final Runnable immediateAction, boolean async) {
        if (async && this._transaction instanceof LocalTransaction) {
            ((LocalTransaction)this._transaction).commitAsync(new Runnable(){

                @Override
                public void run() {
                    try {
                        immediateAction.run();
                    }
                    finally {
                        AMQChannel.this._connection.incrementTransactionBeginCounter();
                    }
                }
            });
        } else {
            this._transaction.commit(immediateAction);
            this._connection.incrementTransactionBeginCounter();
        }
    }

    private void rollback(Runnable postRollbackTask) {
        this._rollingBack = true;
        boolean requiresSuspend = this._suspended.compareAndSet(false, true);
        try {
            this._transaction.rollback();
        }
        finally {
            this._rollingBack = false;
            this._connection.incrementTransactionRollbackCounter();
            this._connection.incrementTransactionBeginCounter();
        }
        postRollbackTask.run();
        for (MessageConsumerAssociation association : this._resendList) {
            MessageInstance messageInstance = association.getMessageInstance();
            MessageInstanceConsumer consumer = association.getConsumer();
            if (consumer.isClosed()) {
                messageInstance.release(consumer);
                continue;
            }
            if (messageInstance.makeAcquisitionUnstealable(consumer) && this._creditManager.useCreditForMessage(association.getSize())) {
                consumer.getTarget().send(consumer, messageInstance, false);
                continue;
            }
            messageInstance.release(consumer);
        }
        this._resendList.clear();
        if (requiresSuspend) {
            this._suspended.set(false);
            for (ConsumerTarget_0_8 target : this.getConsumerTargets()) {
                for (MessageInstanceConsumer sub : target.getConsumers()) {
                    sub.externalStateChange();
                }
            }
        }
    }

    public String toString() {
        return "(" + this._suspended.get() + ", " + this._closing.get() + ", " + this._connection.isClosing() + ") [" + this._connection.toString() + ":" + this._channelId + "]";
    }

    public boolean isClosing() {
        return this._closing.get() || this.getConnection().isClosing();
    }

    public AMQPConnection_0_8<?> getConnection() {
        return this._connection;
    }

    private void setCredit(long prefetchSize, int prefetchCount) {
        if (!this._prefetchLoggedForChannel) {
            this.message(ChannelMessages.PREFETCH_SIZE((Number)prefetchSize, (Number)prefetchCount));
            this._prefetchLoggedForChannel = true;
        }
        if (prefetchCount <= 1 && prefetchSize == 0L) {
            this._logChannelFlowMessages = false;
        }
        boolean hasCredit = this._creditManager.hasCredit();
        this._creditManager.setCreditLimits(prefetchSize, prefetchCount);
        if (hasCredit != this._creditManager.hasCredit()) {
            this.updateAllConsumerNotifyWorkDesired();
        }
    }

    public ClientDeliveryMethod getClientDeliveryMethod() {
        return this._clientDeliveryMethod;
    }

    public Subject getSubject() {
        return this._subject;
    }

    private boolean hasCurrentMessage() {
        return this._currentMessage != null;
    }

    public boolean isChannelFlow() {
        return this._channelFlow;
    }

    public synchronized void block() {
        if (this._blockingEntities.add(this) && this._blocking.compareAndSet(false, true)) {
            this.messageWithSubject(ChannelMessages.FLOW_ENFORCED((String)"** All Queues **"));
            this.getConnection().notifyWork((AMQPSession)this);
        }
    }

    public synchronized void unblock() {
        if (this._blockingEntities.remove(this) && this._blockingEntities.isEmpty() && this._blocking.compareAndSet(true, false)) {
            this.messageWithSubject(ChannelMessages.FLOW_REMOVED());
            this.getConnection().notifyWork((AMQPSession)this);
        }
    }

    public synchronized void block(Queue<?> queue) {
        if (this._blockingEntities.add(queue) && this._blocking.compareAndSet(false, true)) {
            this.messageWithSubject(ChannelMessages.FLOW_ENFORCED((String)queue.getName()));
            this.getConnection().notifyWork((AMQPSession)this);
        }
    }

    public synchronized void unblock(Queue<?> queue) {
        if (this._blockingEntities.remove(queue) && this._blockingEntities.isEmpty() && this._blocking.compareAndSet(true, false) && !this.isClosing()) {
            this.messageWithSubject(ChannelMessages.FLOW_REMOVED());
            this.getConnection().notifyWork((AMQPSession)this);
        }
    }

    public void transportStateChanged() {
        this.updateAllConsumerNotifyWorkDesired();
        this._creditManager.restoreCredit(0L, 0L);
        INFINITE_CREDIT_CREDIT_MANAGER.restoreCredit(0L, 0L);
        if (!this._consumersWithPendingWork.isEmpty() && !this.getAMQPConnection().isTransportBlockedForWriting()) {
            this.getAMQPConnection().notifyWork((AMQPSession)this);
        }
    }

    void updateAllConsumerNotifyWorkDesired() {
        for (ConsumerTarget_0_8 target : this._tag2SubscriptionTargetMap.values()) {
            target.updateNotifyWorkDesired();
        }
    }

    public Object getConnectionReference() {
        return this.getConnection().getReference();
    }

    public int getUnacknowledgedMessageCount() {
        return this.getUnacknowledgedMessageMap().size();
    }

    private void sendFlow(boolean flow) {
        MethodRegistry methodRegistry = this._connection.getMethodRegistry();
        ChannelFlowBody responseBody = methodRegistry.createChannelFlowBody(flow);
        this._connection.writeFrame(responseBody.generateFrame(this._channelId));
    }

    public boolean getBlocking() {
        return this._blocking.get();
    }

    public NamedAddressSpace getAddressSpace() {
        return this.getConnection().getAddressSpace();
    }

    private void deadLetter(long deliveryTag) {
        UnacknowledgedMessageMap unackedMap = this.getUnacknowledgedMessageMap();
        MessageConsumerAssociation association = unackedMap.remove(deliveryTag, true);
        if (association == null) {
            LOGGER.warn("No message found, unable to DLQ delivery tag: " + deliveryTag);
        } else {
            TransactionLogResource owningResource;
            MessageInstance messageInstance = association.getMessageInstance();
            final ServerMessage msg = messageInstance.getMessage();
            int requeues = 0;
            if (messageInstance.makeAcquisitionUnstealable(association.getConsumer())) {
                requeues = messageInstance.routeToAlternate((Action)new Action<MessageInstance>(){

                    public void performAction(MessageInstance requeueEntry) {
                        AMQChannel.this.messageWithSubject(ChannelMessages.DEADLETTERMSG((Number)msg.getMessageNumber(), (String)requeueEntry.getOwningResource().getName()));
                    }
                }, null, null);
            }
            if (requeues == 0 && (owningResource = messageInstance.getOwningResource()) instanceof Queue) {
                Queue queue = (Queue)owningResource;
                MessageDestination alternateBindingDestination = queue.getAlternateBindingDestination();
                if (alternateBindingDestination == null) {
                    this.messageWithSubject(ChannelMessages.DISCARDMSG_NOALTEXCH((Number)msg.getMessageNumber(), (String)queue.getName(), (String)msg.getInitialRoutingAddress()));
                } else {
                    this.messageWithSubject(ChannelMessages.DISCARDMSG_NOROUTE((Number)msg.getMessageNumber(), (String)alternateBindingDestination.getName()));
                }
            }
        }
    }

    public void recordFuture(ListenableFuture<Void> future, ServerTransaction.Action action) {
        this._unfinishedCommandsQueue.add(new AsyncCommand(future, action));
    }

    private void sync() {
        AsyncCommand cmd;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("sync() called on channel " + this.debugIdentity());
        }
        while ((cmd = this._unfinishedCommandsQueue.poll()) != null) {
            cmd.complete();
        }
        if (this._transaction instanceof LocalTransaction) {
            ((LocalTransaction)this._transaction).sync();
        }
    }

    public long getTransactionStartTimeLong() {
        ServerTransaction serverTransaction = this._transaction;
        if (serverTransaction.isTransactional()) {
            return serverTransaction.getTransactionStartTime();
        }
        return 0L;
    }

    public long getTransactionUpdateTimeLong() {
        ServerTransaction serverTransaction = this._transaction;
        if (serverTransaction.isTransactional()) {
            return serverTransaction.getTransactionUpdateTime();
        }
        return 0L;
    }

    @Override
    public void receiveAccessRequest(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write, boolean read) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV[" + this._channelId + "] AccessRequest[ realm: " + realm + " exclusive: " + exclusive + " passive: " + passive + " active: " + active + " write: " + write + " read: " + read + " ]");
        }
        MethodRegistry methodRegistry = this._connection.getMethodRegistry();
        if (ProtocolVersion.v0_91.equals(this._connection.getProtocolVersion())) {
            this._connection.sendConnectionClose(503, "AccessRequest not present in AMQP versions other than 0-8, 0-9", this._channelId);
        } else {
            AccessRequestOkBody response = methodRegistry.createAccessRequestOkBody(0);
            this.sync();
            this._connection.writeFrame(response.generateFrame(this._channelId));
        }
    }

    @Override
    public void receiveBasicAck(long deliveryTag, boolean multiple) {
        Collection<MessageConsumerAssociation> ackedMessages;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV[" + this._channelId + "] BasicAck[ deliveryTag: " + deliveryTag + " multiple: " + multiple + " ]");
        }
        if (!(ackedMessages = this._unacknowledgedMessageMap.acknowledge(deliveryTag, multiple)).isEmpty()) {
            Collection messages = Collections2.transform(ackedMessages, MESSAGE_INSTANCE_FUNCTION);
            this._transaction.dequeue(messages, (ServerTransaction.Action)new MessageAcknowledgeAction(ackedMessages));
        }
    }

    @Override
    public void receiveBasicCancel(AMQShortString consumerTag, boolean nowait) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV[" + this._channelId + "] BasicCancel[ consumerTag: " + consumerTag + " noWait: " + nowait + " ]");
        }
        this.unsubscribeConsumer(consumerTag);
        if (!nowait) {
            MethodRegistry methodRegistry = this._connection.getMethodRegistry();
            BasicCancelOkBody cancelOkBody = methodRegistry.createBasicCancelOkBody(consumerTag);
            this.sync();
            this._connection.writeFrame(cancelOkBody.generateFrame(this._channelId));
        }
    }

    @Override
    public void receiveBasicConsume(AMQShortString queue, AMQShortString consumerTag, boolean noLocal, boolean noAck, boolean exclusive, boolean nowait, FieldTable arguments) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV[" + this._channelId + "] BasicConsume[ queue: " + queue + " consumerTag: " + consumerTag + " noLocal: " + noLocal + " noAck: " + noAck + " exclusive: " + exclusive + " nowait: " + nowait + " arguments: " + arguments + " ]");
        }
        AMQShortString consumerTag1 = consumerTag;
        NamedAddressSpace vHost = this._connection.getAddressSpace();
        this.sync();
        String queueName = AMQShortString.toString(queue);
        MessageSource queue1 = queueName == null ? this.getDefaultQueue() : vHost.getAttainedMessageSource(queueName);
        HashSet<MessageSource> sources = new HashSet<MessageSource>();
        if (arguments != null && arguments.get("x-multiqueue") instanceof Collection) {
            for (Object object : (Collection)arguments.get("x-multiqueue")) {
                String sourceName = String.valueOf(object);
                if ((sourceName = sourceName.trim()).length() == 0) continue;
                MessageSource source = vHost.getAttainedMessageSource(sourceName);
                if (source == null) {
                    sources.clear();
                    break;
                }
                sources.add(source);
            }
            queueName = arguments.get("x-multiqueue").toString();
        } else if (queue1 != null) {
            sources.add(queue1);
        }
        if (sources.isEmpty()) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("No queue for '" + queueName + "'");
            }
            if (queueName != null) {
                this.closeChannel(404, "No such queue, '" + queueName + "'");
            } else {
                this._connection.sendConnectionClose(530, "No queue name provided, no default queue defined.", this._channelId);
            }
        } else {
            try {
                consumerTag1 = this.consumeFromSource(consumerTag1, sources, !noAck, arguments, exclusive, noLocal);
                if (!nowait) {
                    MethodRegistry methodRegistry = this._connection.getMethodRegistry();
                    BasicConsumeOkBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag1);
                    this._connection.writeFrame(responseBody.generateFrame(this._channelId));
                }
            }
            catch (ConsumerTagInUseException cte) {
                this._connection.sendConnectionClose(530, "Non-unique consumer tag, '" + consumerTag1 + "'", this._channelId);
            }
            catch (AMQInvalidArgumentException ise) {
                this._connection.sendConnectionClose(409, ise.getMessage(), this._channelId);
            }
            catch (MessageSource.ExistingExclusiveConsumer e) {
                this._connection.sendConnectionClose(403, "Cannot subscribe to queue '" + queue1.getName() + "' as it already has an existing exclusive consumer", this._channelId);
            }
            catch (MessageSource.ExistingConsumerPreventsExclusive e) {
                this._connection.sendConnectionClose(403, "Cannot subscribe to queue '" + queue1.getName() + "' exclusively as it already has a consumer", this._channelId);
            }
            catch (AccessControlException e) {
                this._connection.sendConnectionClose(403, "Cannot subscribe to queue '" + queue1.getName() + "' permission denied", this._channelId);
            }
            catch (MessageSource.ConsumerAccessRefused consumerAccessRefused) {
                this._connection.sendConnectionClose(403, "Cannot subscribe to queue '" + queue1.getName() + "' as it already has an incompatible exclusivity policy", this._channelId);
            }
            catch (MessageSource.QueueDeleted queueDeleted) {
                this._connection.sendConnectionClose(404, "Cannot subscribe to queue '" + queue1.getName() + "' as it has been deleted", this._channelId);
            }
        }
    }

    @Override
    public void receiveBasicGet(AMQShortString queueName, boolean noAck) {
        MessageSource queue;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV[" + this._channelId + "] BasicGet[ queue: " + queueName + " noAck: " + noAck + " ]");
        }
        NamedAddressSpace vHost = this._connection.getAddressSpace();
        this.sync();
        MessageSource messageSource = queue = queueName == null ? this.getDefaultQueue() : vHost.getAttainedMessageSource(queueName.toString());
        if (queue == null) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("No queue for '" + queueName + "'");
            }
            if (queueName != null) {
                this._connection.sendConnectionClose(404, "No such queue, '" + queueName + "'", this._channelId);
            } else {
                this._connection.sendConnectionClose(530, "No queue name provided, no default queue defined.", this._channelId);
            }
        } else {
            try {
                if (!this.performGet(queue, !noAck)) {
                    MethodRegistry methodRegistry = this._connection.getMethodRegistry();
                    BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null);
                    this._connection.writeFrame(responseBody.generateFrame(this._channelId));
                }
            }
            catch (AccessControlException e) {
                this._connection.sendConnectionClose(403, e.getMessage(), this._channelId);
            }
            catch (MessageSource.ExistingExclusiveConsumer e) {
                this._connection.sendConnectionClose(530, "Queue has an exclusive consumer", this._channelId);
            }
            catch (MessageSource.ExistingConsumerPreventsExclusive e) {
                this._connection.sendConnectionClose(541, "The GET request has been evaluated as an exclusive consumer, this is likely due to a programming error in the Qpid broker", this._channelId);
            }
            catch (MessageSource.ConsumerAccessRefused consumerAccessRefused) {
                this._connection.sendConnectionClose(530, "Queue has an incompatible exclusivity policy", this._channelId);
            }
            catch (MessageSource.QueueDeleted queueDeleted) {
                this._connection.sendConnectionClose(404, "Queue has been deleted", this._channelId);
            }
        }
    }

    @Override
    public void receiveBasicPublish(AMQShortString exchangeName, AMQShortString routingKey, boolean mandatory, boolean immediate) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV[" + this._channelId + "] BasicPublish[ exchange: " + exchangeName + " routingKey: " + routingKey + " mandatory: " + mandatory + " immediate: " + immediate + " ]");
        }
        NamedAddressSpace vHost = this._connection.getAddressSpace();
        if (this.blockingTimeoutExceeded()) {
            this.message(ChannelMessages.FLOW_CONTROL_IGNORED());
            this.closeChannel(311, "Channel flow control was requested, but not enforced by sender");
        } else {
            MessageDestination destination = this.isDefaultExchange(exchangeName) ? vHost.getDefaultDestination() : vHost.getAttainedMessageDestination(exchangeName.toString(), true);
            if (destination == null) {
                this.closeChannel(404, "Unknown exchange name: '" + exchangeName + "'");
            } else {
                MessagePublishInfo info = new MessagePublishInfo(exchangeName, immediate, mandatory, routingKey);
                try {
                    this.setPublishFrame(info, destination);
                }
                catch (AccessControlException e) {
                    this._connection.sendConnectionClose(403, e.getMessage(), this.getChannelId());
                }
            }
        }
    }

    private boolean blockingTimeoutExceeded() {
        return this._wireBlockingState && System.currentTimeMillis() - this._blockTime > this._blockingTimeout;
    }

    @Override
    public void receiveBasicQos(long prefetchSize, int prefetchCount, boolean global) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV[" + this._channelId + "] BasicQos[ prefetchSize: " + prefetchSize + " prefetchCount: " + prefetchCount + " global: " + global + " ]");
        }
        this.sync();
        this.setCredit(prefetchSize, prefetchCount);
        MethodRegistry methodRegistry = this._connection.getMethodRegistry();
        BasicQosOkBody responseBody = methodRegistry.createBasicQosOkBody();
        this._connection.writeFrame(responseBody.generateFrame(this.getChannelId()));
    }

    @Override
    public void receiveBasicRecover(boolean requeue, boolean sync) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV[" + this._channelId + "] BasicRecover[ requeue: " + requeue + " sync: " + sync + " ]");
        }
        if (requeue) {
            this.requeue();
        } else {
            this.resend();
        }
        if (sync) {
            MethodRegistry methodRegistry = this._connection.getMethodRegistry();
            BasicRecoverSyncOkBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody();
            this.sync();
            this._connection.writeFrame(recoverOk.generateFrame(this.getChannelId()));
        }
    }

    @Override
    public void receiveBasicReject(long deliveryTag, boolean requeue) {
        MessageInstance message;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV[" + this._channelId + "] BasicReject[ deliveryTag: " + deliveryTag + " requeue: " + requeue + " ]");
        }
        if ((message = this.getUnacknowledgedMessageMap().get(deliveryTag)) == null) {
            LOGGER.warn("Dropping reject request as message is null for tag:" + deliveryTag);
        } else if (message.getMessage() == null) {
            LOGGER.warn("Message has already been purged, unable to Reject.");
        } else {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage() + ": Requeue:" + requeue + " on channel:" + this.debugIdentity());
            }
            if (requeue) {
                message.decrementDeliveryCount();
                this.requeue(deliveryTag);
            } else {
                boolean maxDeliveryCountEnabled = this.isMaxDeliveryCountEnabled(deliveryTag);
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("maxDeliveryCountEnabled: " + maxDeliveryCountEnabled + " deliveryTag " + deliveryTag);
                }
                if (maxDeliveryCountEnabled) {
                    boolean deliveredTooManyTimes = this.isDeliveredTooManyTimes(deliveryTag);
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("deliveredTooManyTimes: " + deliveredTooManyTimes + " deliveryTag " + deliveryTag);
                    }
                    if (deliveredTooManyTimes) {
                        this.deadLetter(deliveryTag);
                    } else {
                        message.incrementDeliveryCount();
                    }
                } else {
                    this.requeue(deliveryTag);
                }
            }
        }
    }

    @Override
    public void receiveChannelClose(int replyCode, AMQShortString replyText, int classId, int methodId) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV[" + this._channelId + "] ChannelClose[ replyCode: " + replyCode + " replyText: " + replyText + " classId: " + classId + " methodId: " + methodId + " ]");
        }
        this.sync();
        this._connection.closeChannel(this);
        this._connection.writeFrame(new AMQFrame(this.getChannelId(), this._connection.getMethodRegistry().createChannelCloseOkBody()));
    }

    @Override
    public void receiveChannelCloseOk() {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV[" + this._channelId + "] ChannelCloseOk");
        }
        this._connection.closeChannelOk(this.getChannelId());
    }

    @Override
    public void receiveMessageContent(QpidByteBuffer data) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV[" + this._channelId + "] MessageContent[ data: " + Functions.hex((QpidByteBuffer)data, (int)this._connection.getBinaryDataLimit()) + " ] ");
        }
        if (this.hasCurrentMessage()) {
            this.publishContentBody(new ContentBody(data));
        } else {
            this._connection.sendConnectionClose(503, "Attempt to send a content header without first sending a publish frame", this._channelId);
        }
    }

    @Override
    public void receiveMessageHeader(BasicContentHeaderProperties properties, long bodySize) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV[" + this._channelId + "] MessageHeader[ properties: {" + properties + "} bodySize: " + bodySize + " ]");
        }
        if (this.hasCurrentMessage()) {
            if (bodySize > this._connection.getMaxMessageSize()) {
                properties.dispose();
                this.closeChannel(311, "Message size of " + bodySize + " greater than allowed maximum of " + this._connection.getMaxMessageSize());
            } else if (!this._forceMessageValidation || properties.checkValid()) {
                this.publishContentHeader(new ContentHeaderBody(properties, bodySize));
            } else {
                properties.dispose();
                this._connection.sendConnectionClose(501, "Attempt to send a malformed content header", this._channelId);
            }
        } else {
            properties.dispose();
            this._connection.sendConnectionClose(503, "Attempt to send a content header without first sending a publish frame", this._channelId);
        }
    }

    @Override
    public boolean ignoreAllButCloseOk() {
        return this._connection.ignoreAllButCloseOk() || this._connection.channelAwaitingClosure(this._channelId);
    }

    @Override
    public void receiveBasicNack(long deliveryTag, boolean multiple, boolean requeue) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV[" + this._channelId + "] BasicNack[ deliveryTag: " + deliveryTag + " multiple: " + multiple + " requeue: " + requeue + " ]");
        }
        LinkedHashMap<Long, MessageConsumerAssociation> nackedMessageMap = new LinkedHashMap<Long, MessageConsumerAssociation>();
        this._unacknowledgedMessageMap.collect(deliveryTag, multiple, nackedMessageMap);
        for (MessageConsumerAssociation unackedMessageConsumerAssociation : nackedMessageMap.values()) {
            if (unackedMessageConsumerAssociation == null) {
                LOGGER.warn("Ignoring nack request as message is null for tag:" + deliveryTag);
                continue;
            }
            MessageInstance message = unackedMessageConsumerAssociation.getMessageInstance();
            if (message.getMessage() == null) {
                LOGGER.warn("Message has already been purged, unable to nack.");
                continue;
            }
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Nack-ing: DT:" + deliveryTag + "-" + message.getMessage() + ": Requeue:" + requeue + " on channel:" + this.debugIdentity());
            }
            if (requeue) {
                message.decrementDeliveryCount();
                this.requeue(deliveryTag);
                continue;
            }
            message.reject(unackedMessageConsumerAssociation.getConsumer());
            boolean maxDeliveryCountEnabled = this.isMaxDeliveryCountEnabled(deliveryTag);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("maxDeliveryCountEnabled: " + maxDeliveryCountEnabled + " deliveryTag " + deliveryTag);
            }
            if (maxDeliveryCountEnabled) {
                boolean deliveredTooManyTimes = this.isDeliveredTooManyTimes(deliveryTag);
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("deliveredTooManyTimes: " + deliveredTooManyTimes + " deliveryTag " + deliveryTag);
                }
                if (deliveredTooManyTimes) {
                    this.deadLetter(deliveryTag);
                    continue;
                }
                message.incrementDeliveryCount();
                message.release(unackedMessageConsumerAssociation.getConsumer());
                continue;
            }
            this.requeue(deliveryTag);
        }
    }

    @Override
    public void receiveChannelFlow(boolean active) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV[" + this._channelId + "] ChannelFlow[ active: " + active + " ]");
        }
        this.sync();
        if (this._channelFlow != active) {
            this._channelFlow = active;
            this.updateAllConsumerNotifyWorkDesired();
        }
        this.setSuspended(!active);
        MethodRegistry methodRegistry = this._connection.getMethodRegistry();
        ChannelFlowOkBody responseBody = methodRegistry.createChannelFlowOkBody(active);
        this._connection.writeFrame(responseBody.generateFrame(this.getChannelId()));
    }

    @Override
    public void receiveChannelFlowOk(boolean active) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV[" + this._channelId + "] ChannelFlowOk[ active: " + active + " ]");
        }
    }

    @Override
    public void receiveExchangeBound(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName) {
        MessageDestination destination;
        String replyText;
        int replyCode;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV[" + this._channelId + "] ExchangeBound[ exchange: " + exchangeName + " routingKey: " + routingKey + " queue: " + queueName + " ]");
        }
        NamedAddressSpace virtualHost = this._connection.getAddressSpace();
        MethodRegistry methodRegistry = this._connection.getMethodRegistry();
        this.sync();
        if (this.isDefaultExchange(exchangeName)) {
            if (routingKey == null) {
                if (queueName == null) {
                    replyCode = virtualHost.hasMessageSources() ? 0 : 3;
                    replyText = null;
                } else {
                    MessageSource queue = virtualHost.getAttainedMessageSource(queueName.toString());
                    if (queue == null) {
                        replyCode = 2;
                        replyText = "Queue '" + queueName + "' not found";
                    } else {
                        replyCode = 0;
                        replyText = null;
                    }
                }
            } else if (queueName == null) {
                replyCode = virtualHost.getAttainedMessageDestination(routingKey.toString(), false) instanceof Queue ? 0 : 5;
                replyText = null;
            } else {
                Queue queue;
                destination = virtualHost.getAttainedMessageDestination(queueName.toString(), false);
                Queue queue2 = queue = destination instanceof Queue ? (Queue)destination : null;
                if (queue == null) {
                    replyCode = 2;
                    replyText = "Queue '" + queueName + "' not found";
                } else {
                    replyCode = queueName.equals(routingKey) ? 0 : 6;
                    replyText = null;
                }
            }
        } else {
            destination = this.getAddressSpace().getAttainedMessageDestination(exchangeName.toString(), true);
            if (!(destination instanceof Exchange)) {
                replyCode = 1;
                replyText = "Exchange '" + exchangeName + "' not found";
            } else if (routingKey == null) {
                Exchange exchange = (Exchange)destination;
                if (queueName == null) {
                    if (exchange.hasBindings()) {
                        replyCode = 0;
                        replyText = null;
                    } else {
                        replyCode = 3;
                        replyText = null;
                    }
                } else {
                    Queue<?> queue = this.getQueue(queueName.toString());
                    if (queue == null) {
                        replyCode = 2;
                        replyText = "Queue '" + queueName + "' not found";
                    } else if (exchange.isBound(queue)) {
                        replyCode = 0;
                        replyText = null;
                    } else {
                        replyCode = 4;
                        replyText = "Queue '" + queueName + "' not bound to exchange '" + exchangeName + "'";
                    }
                }
            } else if (queueName != null) {
                Exchange exchange = (Exchange)destination;
                Queue<?> queue = this.getQueue(queueName.toString());
                if (queue == null) {
                    replyCode = 2;
                    replyText = "Queue '" + queueName + "' not found";
                } else {
                    String bindingKey;
                    String string = bindingKey = routingKey == null ? null : routingKey.toString();
                    if (exchange.isBound(bindingKey, queue)) {
                        replyCode = 0;
                        replyText = null;
                    } else {
                        replyCode = 6;
                        replyText = "Queue '" + queueName + "' not bound with routing key '" + routingKey + "' to exchange '" + exchangeName + "'";
                    }
                }
            } else {
                Exchange exchange = (Exchange)destination;
                if (exchange.isBound(routingKey == null ? "" : routingKey.toString())) {
                    replyCode = 0;
                    replyText = null;
                } else {
                    replyCode = 5;
                    replyText = "No queue bound with routing key '" + routingKey + "' to exchange '" + exchangeName + "'";
                }
            }
        }
        ExchangeBoundOkBody exchangeBoundOkBody = methodRegistry.createExchangeBoundOkBody(replyCode, AMQShortString.validValueOf(replyText));
        this._connection.writeFrame(exchangeBoundOkBody.generateFrame(this.getChannelId()));
    }

    @Override
    public void receiveExchangeDeclare(AMQShortString exchangeName, AMQShortString type, boolean passive, boolean durable, boolean autoDelete, boolean internal, boolean nowait, FieldTable arguments) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV[" + this._channelId + "] ExchangeDeclare[ exchange: " + exchangeName + " type: " + type + " passive: " + passive + " durable: " + durable + " autoDelete: " + autoDelete + " internal: " + internal + " nowait: " + nowait + " arguments: " + arguments + " ]");
        }
        MethodRegistry methodRegistry = this._connection.getMethodRegistry();
        ExchangeDeclareOkBody declareOkBody = methodRegistry.createExchangeDeclareOkBody();
        NamedAddressSpace virtualHost = this._connection.getAddressSpace();
        if (this.isDefaultExchange(exchangeName)) {
            if (!AMQShortString.createAMQShortString("direct").equals(type)) {
                this._connection.sendConnectionClose(530, "Attempt to redeclare default exchange:  of type direct to " + type + ".", this.getChannelId());
            } else if (!nowait) {
                this.sync();
                this._connection.writeFrame(declareOkBody.generateFrame(this.getChannelId()));
            }
        } else if (passive) {
            Exchange<?> exchange = this.getExchange(exchangeName.toString());
            if (exchange == null) {
                this.closeChannel(404, "Unknown exchange: '" + exchangeName + "'");
            } else if (type != null && type.length() != 0 && !exchange.getType().equals(type.toString())) {
                this._connection.sendConnectionClose(530, "Attempt to redeclare exchange: '" + exchangeName + "' of type " + exchange.getType() + " to " + type + ".", this.getChannelId());
            } else if (!nowait) {
                this.sync();
                this._connection.writeFrame(declareOkBody.generateFrame(this.getChannelId()));
            }
        } else {
            String name = exchangeName.toString();
            String typeString = type == null ? null : type.toString();
            try {
                HashMap<String, Object> attributes = new HashMap<String, Object>();
                if (arguments != null) {
                    attributes.putAll(FieldTable.convertToMap(arguments));
                }
                attributes.put("name", name);
                attributes.put("type", typeString);
                attributes.put("durable", durable);
                attributes.put("lifetimePolicy", autoDelete ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
                Object alternateExchange = attributes.remove(ALTERNATE_EXCHANGE);
                if (alternateExchange != null) {
                    String alternateExchangeName = String.valueOf(alternateExchange);
                    this.validateAlternateExchangeIsNotQueue(virtualHost, alternateExchangeName);
                    attributes.put("alternateBinding", Collections.singletonMap("destination", alternateExchangeName));
                }
                this.validateAndSanitizeExchangeDeclareArguments(attributes);
                Exchange exchange = (Exchange)virtualHost.createMessageDestination(Exchange.class, attributes);
                if (!nowait) {
                    this.sync();
                    this._connection.writeFrame(declareOkBody.generateFrame(this.getChannelId()));
                }
            }
            catch (ReservedExchangeNameException e) {
                Exchange<?> existing = this.getExchange(name);
                if (existing == null || !existing.getType().equals(typeString)) {
                    this._connection.sendConnectionClose(530, "Attempt to declare exchange: '" + exchangeName + "' which begins with reserved prefix.", this.getChannelId());
                } else if (!nowait) {
                    this.sync();
                    this._connection.writeFrame(declareOkBody.generateFrame(this.getChannelId()));
                }
            }
            catch (AbstractConfiguredObject.DuplicateNameException e) {
                Exchange exchange = (Exchange)e.getExisting();
                if (!exchange.getType().equals(typeString)) {
                    this._connection.sendConnectionClose(530, "Attempt to redeclare exchange: '" + exchangeName + "' of type " + exchange.getType() + " to " + type + ".", this.getChannelId());
                } else if (!nowait) {
                    this.sync();
                    this._connection.writeFrame(declareOkBody.generateFrame(this.getChannelId()));
                }
            }
            catch (NoFactoryForTypeException e) {
                this._connection.sendConnectionClose(503, "Unknown exchange type '" + e.getType() + "' for exchange '" + exchangeName + "'", this.getChannelId());
            }
            catch (AccessControlException e) {
                this._connection.sendConnectionClose(403, e.getMessage(), this.getChannelId());
            }
            catch (UnknownAlternateBindingException e) {
                String message = String.format("Unknown alternate destination '%s'", e.getAlternateBindingName());
                this._connection.sendConnectionClose(404, message, this.getChannelId());
            }
            catch (IllegalArgumentException | IllegalConfigurationException e) {
                this._connection.sendConnectionClose(542, "Error creating exchange '" + exchangeName + "': " + e.getMessage(), this.getChannelId());
            }
        }
    }

    private void validateAndSanitizeExchangeDeclareArguments(Map<String, Object> attributes) {
        ConfiguredObjectTypeRegistry typeRegistry = this.getModel().getTypeRegistry();
        ArrayList types = new ArrayList(typeRegistry.getAttributeTypes(Exchange.class).values());
        typeRegistry.getTypeSpecialisations(Exchange.class).forEach(type -> types.addAll(typeRegistry.getTypeSpecificAttributes(type)));
        Set unsupported = attributes.keySet().stream().filter(name -> types.stream().noneMatch(a -> Objects.equals(name, a.getName()) && !a.isDerived())).collect(Collectors.toSet());
        if (!unsupported.isEmpty()) {
            Exchange.BehaviourOnUnknownDeclareArgument unknownArgumentBehaviour = (Exchange.BehaviourOnUnknownDeclareArgument)this.getConnection().getContextValue(Exchange.BehaviourOnUnknownDeclareArgument.class, "exchange.behaviourOnUnknownDeclareArgument");
            switch (unknownArgumentBehaviour) {
                case LOG: {
                    LOGGER.warn("Unsupported exchange declare arguments : {}", (Object)String.join((CharSequence)",", unsupported));
                }
                case IGNORE: {
                    attributes.keySet().removeAll(unsupported);
                    break;
                }
                default: {
                    throw new IllegalArgumentException(String.format("Unsupported exchange declare arguments : %s", String.join((CharSequence)",", unsupported)));
                }
            }
        }
    }

    private void validateAlternateExchangeIsNotQueue(NamedAddressSpace addressSpace, String alternateExchangeName) {
        MessageDestination alternateMessageDestination = addressSpace.getAttainedMessageDestination(alternateExchangeName, false);
        if (alternateMessageDestination != null && !(alternateMessageDestination instanceof Exchange)) {
            throw new IllegalConfigurationException(String.format("Alternate exchange '%s' is not a destination of type 'exchange'.", alternateExchangeName));
        }
    }

    @Override
    public void receiveExchangeDelete(AMQShortString exchangeStr, boolean ifUnused, boolean nowait) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV[" + this._channelId + "] ExchangeDelete[ exchange: " + exchangeStr + " ifUnused: " + ifUnused + " nowait: " + nowait + " ]");
        }
        NamedAddressSpace virtualHost = this._connection.getAddressSpace();
        this.sync();
        if (this.isDefaultExchange(exchangeStr)) {
            this._connection.sendConnectionClose(530, "Default Exchange cannot be deleted", this.getChannelId());
        } else {
            String exchangeName = exchangeStr.toString();
            Exchange<?> exchange = this.getExchange(exchangeName);
            if (exchange == null) {
                this.closeChannel(404, "No such exchange: '" + exchangeStr + "'");
            } else if (ifUnused && exchange.hasBindings()) {
                this.closeChannel(406, "Exchange has bindings");
            } else {
                try {
                    exchange.delete();
                    if (!nowait) {
                        ExchangeDeleteOkBody responseBody = this._connection.getMethodRegistry().createExchangeDeleteOkBody();
                        this._connection.writeFrame(responseBody.generateFrame(this.getChannelId()));
                    }
                }
                catch (MessageDestinationIsAlternateException e) {
                    this.closeChannel(530, "Exchange in use as an alternate binding destination");
                }
                catch (RequiredExchangeException e) {
                    this.closeChannel(530, "Exchange '" + exchangeStr + "' cannot be deleted");
                }
                catch (AccessControlException e) {
                    this._connection.sendConnectionClose(403, e.getMessage(), this.getChannelId());
                }
            }
        }
    }

    @Override
    public void receiveQueueBind(AMQShortString queueName, AMQShortString exchange, AMQShortString bindingKey, boolean nowait, FieldTable argumentsTable) {
        Queue<?> queue;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV[" + this._channelId + "] QueueBind[ queue: " + queueName + " exchange: " + exchange + " bindingKey: " + bindingKey + " nowait: " + nowait + " arguments: " + argumentsTable + " ]");
        }
        NamedAddressSpace virtualHost = this._connection.getAddressSpace();
        if (queueName == null) {
            queue = this.getDefaultQueue();
            if (queue != null && bindingKey == null) {
                bindingKey = AMQShortString.valueOf(queue.getName());
            }
        } else {
            queue = this.getQueue(queueName.toString());
        }
        if (queue == null) {
            String message = queueName == null ? "No default queue defined on channel and queue was null" : "Queue " + queueName + " does not exist.";
            this.closeChannel(404, message);
        } else if (this.isDefaultExchange(exchange)) {
            this._connection.sendConnectionClose(530, "Cannot bind the queue '" + queueName + "' to the default exchange", this.getChannelId());
        } else {
            String exchangeName = exchange.toString();
            Exchange<?> exch = this.getExchange(exchangeName);
            if (exch == null) {
                this.closeChannel(404, "Exchange '" + exchangeName + "' does not exist.");
            } else {
                try {
                    String bindingKeyStr;
                    Map<String, Object> arguments = FieldTable.convertToMap(argumentsTable);
                    String string = bindingKeyStr = bindingKey == null ? "" : AMQShortString.toString(bindingKey);
                    if (!exch.isBound(bindingKeyStr, arguments, queue)) {
                        try {
                            if (!exch.addBinding(bindingKeyStr, queue, arguments) && "topic".equals(exch.getType())) {
                                exch.replaceBinding(bindingKeyStr, queue, arguments);
                            }
                        }
                        catch (AMQInvalidArgumentException e) {
                            this._connection.sendConnectionClose(409, String.format("Cannot bind queue '%s' to exchange '%s' due to invalid argument : %s", queueName, exch.getName(), e.getMessage()), this.getChannelId());
                        }
                    }
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("Binding queue " + queue + " to exchange " + exch + " with routing key " + bindingKeyStr);
                    }
                    if (!nowait) {
                        this.sync();
                        MethodRegistry methodRegistry = this._connection.getMethodRegistry();
                        QueueBindOkBody responseBody = methodRegistry.createQueueBindOkBody();
                        this._connection.writeFrame(responseBody.generateFrame(this.getChannelId()));
                    }
                }
                catch (AccessControlException e) {
                    this._connection.sendConnectionClose(403, e.getMessage(), this.getChannelId());
                }
            }
        }
    }

    @Override
    public void receiveQueueDeclare(AMQShortString queueStr, boolean passive, boolean durable, boolean exclusive, boolean autoDelete, boolean nowait, FieldTable arguments) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV[" + this._channelId + "] QueueDeclare[ queue: " + queueStr + " passive: " + passive + " durable: " + durable + " exclusive: " + exclusive + " autoDelete: " + autoDelete + " nowait: " + nowait + " arguments: " + arguments + " ]");
        }
        NamedAddressSpace virtualHost = this._connection.getAddressSpace();
        AMQShortString queueName = queueStr == null || queueStr.length() == 0 ? AMQShortString.createAMQShortString("tmp_" + UUID.randomUUID()) : queueStr;
        if (passive) {
            Queue<?> queue = this.getQueue(queueName.toString());
            if (queue == null) {
                this.closeChannel(404, "Queue: '" + queueName + "' not found on VirtualHost '" + virtualHost.getName() + "'.");
            } else if (!queue.verifySessionAccess((AMQPSession)this)) {
                this._connection.sendConnectionClose(530, "Queue '" + queue.getName() + "' is exclusive, but not created on this Connection.", this.getChannelId());
            } else {
                this.setDefaultQueue(queue);
                if (!nowait) {
                    this.sync();
                    MethodRegistry methodRegistry = this._connection.getMethodRegistry();
                    QueueDeclareOkBody responseBody = methodRegistry.createQueueDeclareOkBody(queueName, queue.getQueueDepthMessages(), queue.getConsumerCount());
                    this._connection.writeFrame(responseBody.generateFrame(this.getChannelId()));
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("Queue " + queueName + " declared successfully");
                    }
                }
            }
        } else {
            try {
                ExclusivityPolicy exclusivityPolicy;
                LifetimePolicy lifetimePolicy;
                String queueNameString = AMQShortString.toString(queueName);
                Map<String, Object> wireArguments = FieldTable.convertToMap(arguments);
                Object alternateExchange = wireArguments.get(ALTERNATE_EXCHANGE);
                if (alternateExchange != null) {
                    String alternateExchangeName = String.valueOf(alternateExchange);
                    this.validateAlternateExchangeIsNotQueue(virtualHost, alternateExchangeName);
                }
                Queue.BehaviourOnUnknownDeclareArgument unknownArgumentBehaviour = (Queue.BehaviourOnUnknownDeclareArgument)this.getConnection().getContextValue(Queue.BehaviourOnUnknownDeclareArgument.class, "queue.behaviourOnUnknownDeclareArgument");
                Map attributes = QueueArgumentsConverter.convertWireArgsToModel((String)queueNameString, wireArguments, (Model)this.getModel(), (Queue.BehaviourOnUnknownDeclareArgument)unknownArgumentBehaviour);
                attributes.put("name", queueNameString);
                attributes.put("durable", durable);
                if (exclusive) {
                    lifetimePolicy = autoDelete ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : (durable ? LifetimePolicy.PERMANENT : LifetimePolicy.DELETE_ON_CONNECTION_CLOSE);
                    exclusivityPolicy = durable ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.CONNECTION;
                } else {
                    lifetimePolicy = autoDelete ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT;
                    exclusivityPolicy = ExclusivityPolicy.NONE;
                }
                if (!attributes.containsKey("exclusive")) {
                    attributes.put("exclusive", exclusivityPolicy);
                }
                if (!attributes.containsKey("lifetimePolicy")) {
                    attributes.put("lifetimePolicy", lifetimePolicy);
                }
                Queue queue = (Queue)virtualHost.createMessageSource(Queue.class, attributes);
                this.setDefaultQueue(queue);
                if (!nowait) {
                    this.sync();
                    MethodRegistry methodRegistry = this._connection.getMethodRegistry();
                    QueueDeclareOkBody responseBody = methodRegistry.createQueueDeclareOkBody(queueName, queue.getQueueDepthMessages(), queue.getConsumerCount());
                    this._connection.writeFrame(responseBody.generateFrame(this.getChannelId()));
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("Queue " + queueName + " declared successfully");
                    }
                }
            }
            catch (AbstractConfiguredObject.DuplicateNameException qe) {
                Queue queue = (Queue)qe.getExisting();
                if (!queue.verifySessionAccess((AMQPSession)this)) {
                    this._connection.sendConnectionClose(530, "Queue '" + queue.getName() + "' is exclusive, but not created on this Connection.", this.getChannelId());
                } else if (queue.isExclusive() != exclusive) {
                    this.closeChannel(405, "Cannot re-declare queue '" + queue.getName() + "' with different exclusivity (was: " + queue.isExclusive() + " requested " + exclusive + ")");
                } else if (autoDelete && queue.getLifetimePolicy() == LifetimePolicy.PERMANENT || !autoDelete && queue.getLifetimePolicy() != (exclusive && !durable ? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE : LifetimePolicy.PERMANENT)) {
                    this.closeChannel(405, "Cannot re-declare queue '" + queue.getName() + "' with different lifetime policy (was: " + queue.getLifetimePolicy() + " requested autodelete: " + autoDelete + ")");
                } else {
                    this.setDefaultQueue(queue);
                    if (!nowait) {
                        this.sync();
                        MethodRegistry methodRegistry = this._connection.getMethodRegistry();
                        QueueDeclareOkBody responseBody = methodRegistry.createQueueDeclareOkBody(queueName, queue.getQueueDepthMessages(), queue.getConsumerCount());
                        this._connection.writeFrame(responseBody.generateFrame(this.getChannelId()));
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug("Queue " + queueName + " declared successfully");
                        }
                    }
                }
            }
            catch (AccessControlException e) {
                this._connection.sendConnectionClose(403, e.getMessage(), this.getChannelId());
            }
            catch (UnknownAlternateBindingException e) {
                String message = String.format("Unknown alternate destination: '%s'", e.getAlternateBindingName());
                this._connection.sendConnectionClose(404, message, this.getChannelId());
            }
            catch (IllegalArgumentException | IllegalConfigurationException e) {
                String message = String.format("Error creating queue '%s': %s", queueName, e.getMessage());
                this._connection.sendConnectionClose(542, message, this.getChannelId());
            }
        }
    }

    @Override
    public void receiveQueueDelete(AMQShortString queueName, boolean ifUnused, boolean ifEmpty, boolean nowait) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV[" + this._channelId + "] QueueDelete[ queue: " + queueName + " ifUnused: " + ifUnused + " ifEmpty: " + ifEmpty + " nowait: " + nowait + " ]");
        }
        NamedAddressSpace virtualHost = this._connection.getAddressSpace();
        this.sync();
        Queue<?> queue = queueName == null ? this.getDefaultQueue() : this.getQueue(queueName.toString());
        if (queue == null) {
            this.closeChannel(404, "Queue '" + queueName + "' does not exist.");
        } else if (ifEmpty && !queue.isEmpty()) {
            this.closeChannel(406, "Queue: '" + queueName + "' is not empty.");
        } else if (ifUnused && !queue.isUnused()) {
            this.closeChannel(406, "Queue: '" + queueName + "' is still used.");
        } else if (!queue.verifySessionAccess((AMQPSession)this)) {
            this._connection.sendConnectionClose(530, "Queue '" + queue.getName() + "' is exclusive, but not created on this Connection.", this.getChannelId());
        } else {
            try {
                int purged = queue.deleteAndReturnCount();
                if (!nowait || this._connection.isSendQueueDeleteOkRegardless()) {
                    MethodRegistry methodRegistry = this._connection.getMethodRegistry();
                    QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(purged);
                    this._connection.writeFrame(responseBody.generateFrame(this.getChannelId()));
                }
            }
            catch (AccessControlException e) {
                this._connection.sendConnectionClose(403, e.getMessage(), this.getChannelId());
            }
        }
    }

    @Override
    public void receiveQueuePurge(AMQShortString queueName, boolean nowait) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV[" + this._channelId + "] QueuePurge[ queue: " + queueName + " nowait: " + nowait + " ]");
        }
        NamedAddressSpace virtualHost = this._connection.getAddressSpace();
        Queue<?> queue = null;
        if (queueName == null && (queue = this.getDefaultQueue()) == null) {
            this._connection.sendConnectionClose(530, "No queue specified.", this.getChannelId());
        } else if (queueName != null && (queue = this.getQueue(queueName.toString())) == null) {
            this.closeChannel(404, "Queue '" + queueName + "' does not exist.");
        } else if (!queue.verifySessionAccess((AMQPSession)this)) {
            this._connection.sendConnectionClose(530, "Queue is exclusive, but not created on this Connection.", this.getChannelId());
        } else {
            try {
                long purged = queue.clearQueue();
                if (!nowait) {
                    this.sync();
                    MethodRegistry methodRegistry = this._connection.getMethodRegistry();
                    QueuePurgeOkBody responseBody = methodRegistry.createQueuePurgeOkBody(purged);
                    this._connection.writeFrame(responseBody.generateFrame(this.getChannelId()));
                }
            }
            catch (AccessControlException e) {
                this._connection.sendConnectionClose(403, e.getMessage(), this.getChannelId());
            }
        }
    }

    @Override
    public void receiveQueueUnbind(AMQShortString queueName, AMQShortString exchange, AMQShortString bindingKey, FieldTable arguments) {
        Queue<?> queue;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV[" + this._channelId + "] QueueUnbind[ queue: " + queueName + " exchange: " + exchange + " bindingKey: " + bindingKey + " arguments: " + arguments + " ]");
        }
        NamedAddressSpace virtualHost = this._connection.getAddressSpace();
        boolean useDefaultQueue = queueName == null;
        Queue<?> queue2 = queue = useDefaultQueue ? this.getDefaultQueue() : this.getQueue(queueName.toString());
        if (queue == null) {
            String message = useDefaultQueue ? "No default queue defined on channel and queue was null" : "Queue '" + queueName + "' does not exist.";
            this.closeChannel(404, message);
        } else if (this.isDefaultExchange(exchange)) {
            this._connection.sendConnectionClose(530, "Cannot unbind the queue '" + queue.getName() + "' from the default exchange", this.getChannelId());
        } else {
            String bindingKeyStr;
            Exchange<?> exch = this.getExchange(exchange.toString());
            String string = bindingKeyStr = bindingKey == null ? "" : AMQShortString.toString(bindingKey);
            if (exch == null) {
                this.closeChannel(404, "Exchange '" + exchange + "' does not exist.");
            } else if (!exch.hasBinding(bindingKeyStr, queue)) {
                this.closeChannel(404, "No such binding");
            } else {
                try {
                    exch.deleteBinding(bindingKeyStr, queue);
                    QueueUnbindOkBody responseBody = this._connection.getMethodRegistry().createQueueUnbindOkBody();
                    this.sync();
                    this._connection.writeFrame(responseBody.generateFrame(this.getChannelId()));
                }
                catch (AccessControlException e) {
                    this._connection.sendConnectionClose(403, e.getMessage(), this.getChannelId());
                }
            }
        }
    }

    @Override
    public void receiveTxSelect() {
        ServerTransaction txn;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV[" + this._channelId + "] TxSelect");
        }
        if ((txn = this._transaction) instanceof LocalTransaction) {
            this.getConnection().unregisterTransactionTickers(this._transaction);
        }
        this._transaction = this._connection.createLocalTransaction();
        long notificationRepeatPeriod = (Long)this.getContextValue(Long.class, "qpid.session.transactionTimeoutNotificationRepeatPeriod");
        this.getConnection().registerTransactionTickers(this._transaction, message -> this._connection.sendConnectionCloseAsync(AMQPConnection.CloseReason.TRANSACTION_TIMEOUT, (String)message), notificationRepeatPeriod);
        MethodRegistry methodRegistry = this._connection.getMethodRegistry();
        TxSelectOkBody responseBody = methodRegistry.createTxSelectOkBody();
        this._connection.writeFrame(responseBody.generateFrame(this._channelId));
    }

    @Override
    public void receiveTxCommit() {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV[" + this._channelId + "] TxCommit");
        }
        if (!this.isTransactional()) {
            this.closeChannel(503, "Fatal error: commit called on non-transactional channel");
        } else {
            this.commit(new Runnable(){

                @Override
                public void run() {
                    AMQChannel.this._connection.writeFrame(AMQChannel.this._txCommitOkFrame);
                }
            }, true);
        }
    }

    @Override
    public void receiveTxRollback() {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV[" + this._channelId + "] TxRollback");
        }
        if (!this.isTransactional()) {
            this.closeChannel(503, "Fatal error: rollback called on non-transactional channel");
        } else {
            MethodRegistry methodRegistry = this._connection.getMethodRegistry();
            TxRollbackOkBody responseBody = methodRegistry.createTxRollbackOkBody();
            Runnable task = () -> this._connection.writeFrame(responseBody.generateFrame(this._channelId));
            this.rollback(task);
            this.resend();
        }
    }

    @Override
    public void receiveConfirmSelect(boolean nowait) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV[" + this._channelId + "] ConfirmSelect [ nowait: " + nowait + " ]");
        }
        this._confirmOnPublish = true;
        if (!nowait) {
            this._connection.writeFrame(new AMQFrame(this._channelId, ConfirmSelectOkBody.INSTANCE));
        }
    }

    private void closeChannel(int cause, String message) {
        this._connection.closeChannelAndWriteFrame(this, cause, message);
    }

    private boolean isDefaultExchange(AMQShortString exchangeName) {
        return exchangeName == null || AMQShortString.EMPTY_STRING.equals(exchangeName);
    }

    private void setDefaultQueue(Queue<?> queue) {
        Queue<?> currentDefaultQueue = this._defaultQueue;
        if (queue != currentDefaultQueue) {
            if (currentDefaultQueue != null) {
                currentDefaultQueue.removeDeleteTask((Action)this._defaultQueueAssociationClearingTask);
            }
            if (queue != null) {
                queue.addDeleteTask((Action)this._defaultQueueAssociationClearingTask);
            }
        }
        this._defaultQueue = queue;
    }

    private Queue<?> getDefaultQueue() {
        return this._defaultQueue;
    }

    protected void updateBlockedStateIfNecessary() {
        boolean desiredBlockingState = this._blocking.get();
        if (desiredBlockingState != this._wireBlockingState) {
            this._wireBlockingState = desiredBlockingState;
            this.sendFlow(!desiredBlockingState);
            this._blockTime = desiredBlockingState ? System.currentTimeMillis() : 0L;
        }
    }

    @Override
    public void restoreCredit(ConsumerTarget target, int count, long size) {
        boolean hasCredit = this._creditManager.hasCredit();
        this._creditManager.restoreCredit(count, size);
        if (this._creditManager.hasCredit() != hasCredit) {
            if (hasCredit || !this._creditManager.isNotBytesLimitedAndHighPrefetch()) {
                this.updateAllConsumerNotifyWorkDesired();
            }
        } else if (hasCredit) {
            if (this._creditManager.isNotBytesLimitedAndHighPrefetch()) {
                if (this._creditManager.isCreditOverBatchLimit()) {
                    this.updateAllConsumerNotifyWorkDesired();
                }
            } else if (this._creditManager.isBytesLimited()) {
                target.notifyWork();
            }
        }
    }

    private Collection<ConsumerTarget_0_8> getConsumerTargets() {
        return this._tag2SubscriptionTargetMap.values();
    }

    private Exchange<?> getExchange(String name) {
        MessageDestination destination = this.getAddressSpace().getAttainedMessageDestination(name, false);
        return destination instanceof Exchange ? (Exchange)destination : null;
    }

    private Queue<?> getQueue(String name) {
        MessageSource source = this.getAddressSpace().getAttainedMessageSource(name);
        return source instanceof Queue ? (Queue)source : null;
    }

    public void dispose() {
        this._txCommitOkFrame.dispose();
        IncomingMessage currentMessage = this._currentMessage;
        if (currentMessage != null) {
            int bodyCount;
            this._currentMessage = null;
            ContentHeaderBody contentHeader = currentMessage.getContentHeader();
            if (contentHeader != null) {
                contentHeader.dispose();
            }
            if ((bodyCount = currentMessage.getBodyCount()) > 0) {
                for (int i = 0; i < bodyCount; ++i) {
                    currentMessage.getContentChunk(i).dispose();
                }
            }
        }
    }

    private class DefaultQueueAssociationClearingTask
    implements Action<Queue<?>> {
        private DefaultQueueAssociationClearingTask() {
        }

        public void performAction(Queue<?> queue) {
            if (queue == AMQChannel.this._defaultQueue) {
                AMQChannel.this._defaultQueue = null;
            }
        }
    }

    private class WriteReturnAction
    implements ServerTransaction.Action {
        private final int _errorCode;
        private final String _description;
        private final MessageReference<AMQMessage> _reference;

        public WriteReturnAction(int errorCode, String description, AMQMessage message) {
            this._errorCode = errorCode;
            this._description = description;
            this._reference = message.newReference();
        }

        public void postCommit() {
            AMQMessage message = (AMQMessage)this._reference.getMessage();
            AMQChannel.this._connection.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(), message.getContentHeaderBody(), (MessageContentSource)message, AMQChannel.this._channelId, this._errorCode, AMQShortString.validValueOf(this._description));
            this._reference.release();
        }

        public void onRollback() {
            this._reference.release();
        }
    }

    private class MessageAcknowledgeAction
    implements ServerTransaction.Action {
        private Collection<MessageConsumerAssociation> _ackedMessages;

        public MessageAcknowledgeAction(Collection<MessageConsumerAssociation> ackedMessages) {
            this._ackedMessages = ackedMessages;
        }

        public void postCommit() {
            try {
                for (MessageConsumerAssociation association : this._ackedMessages) {
                    association.getMessageInstance().delete();
                }
            }
            finally {
                this._ackedMessages = Collections.emptySet();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onRollback() {
            if (AMQChannel.this._rollingBack) {
                for (MessageConsumerAssociation association : this._ackedMessages) {
                    association.getMessageInstance().makeAcquisitionStealable();
                }
                AMQChannel.this._resendList.addAll(this._ackedMessages);
            } else {
                try {
                    for (MessageConsumerAssociation association : this._ackedMessages) {
                        MessageInstance messageInstance = association.getMessageInstance();
                        messageInstance.release(association.getConsumer());
                    }
                }
                finally {
                    this._ackedMessages = Collections.emptySet();
                }
            }
        }
    }

    private class ImmediateAction
    implements Action<MessageInstance> {
        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void performAction(MessageInstance entry) {
            if (!entry.getDeliveredToConsumer() && entry.acquire()) {
                LocalTransaction txn = new LocalTransaction(AMQChannel.this._messageStore);
                final AMQMessage message = (AMQMessage)entry.getMessage();
                MessageReference ref = message.newReference();
                try {
                    entry.delete();
                    txn.dequeue(entry.getEnqueueRecord(), new ServerTransaction.Action(){

                        public void postCommit() {
                            ProtocolOutputConverter outputConverter = AMQChannel.this._connection.getProtocolOutputConverter();
                            outputConverter.writeReturn(message.getMessagePublishInfo(), message.getContentHeaderBody(), (MessageContentSource)message, AMQChannel.this._channelId, 313, IMMEDIATE_DELIVERY_REPLY_TEXT);
                        }

                        public void onRollback() {
                        }
                    });
                    txn.commit();
                }
                finally {
                    ref.release();
                }
            }
        }
    }

    private class GetDeliveryMethod
    implements ClientDeliveryMethod {
        private final MessageSource _queue;
        private boolean _deliveredMessage;

        public GetDeliveryMethod(MessageSource queue) {
            this._queue = queue;
        }

        @Override
        public long deliverToClient(ConsumerTarget_0_8 target, AMQMessage message, InstanceProperties props, long deliveryTag) {
            int queueSize = this._queue instanceof Queue ? ((Queue)this._queue).getQueueDepthMessages() : 0;
            long size = AMQChannel.this._connection.getProtocolOutputConverter().writeGetOk(message, props, AMQChannel.this.getChannelId(), deliveryTag, queueSize);
            this._deliveredMessage = true;
            return size;
        }

        public boolean hasDeliveredMessage() {
            return this._deliveredMessage;
        }
    }

    private class NoLocalFilter
    implements MessageFilter {
        private final Object _connectionReference;

        public NoLocalFilter() {
            this._connectionReference = AMQChannel.this.getConnectionReference();
        }

        public String getName() {
            return AMQPFilterTypes.NO_LOCAL.toString();
        }

        public boolean matches(Filterable message) {
            return message.getConnectionReference() != this._connectionReference;
        }

        public boolean startAtTail() {
            return false;
        }

        public String toString() {
            return "NoLocalFilter[]";
        }
    }
}

