/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.server.protocol.v0_8;

import com.google.common.util.concurrent.ListenableFuture;
import java.security.AccessControlContext;
import java.security.AccessControlException;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.AccessRequestOkBody;
import org.apache.qpid.framing.BasicAckBody;
import org.apache.qpid.framing.BasicCancelOkBody;
import org.apache.qpid.framing.BasicConsumeOkBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.BasicGetEmptyBody;
import org.apache.qpid.framing.BasicNackBody;
import org.apache.qpid.framing.BasicQosOkBody;
import org.apache.qpid.framing.BasicRecoverSyncOkBody;
import org.apache.qpid.framing.ChannelFlowBody;
import org.apache.qpid.framing.ChannelFlowOkBody;
import org.apache.qpid.framing.ConfirmSelectOkBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.ExchangeBoundOkBody;
import org.apache.qpid.framing.ExchangeDeclareOkBody;
import org.apache.qpid.framing.ExchangeDeleteOkBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.MessagePublishInfo;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.framing.QueueBindOkBody;
import org.apache.qpid.framing.QueueDeclareOkBody;
import org.apache.qpid.framing.QueueDeleteOkBody;
import org.apache.qpid.framing.QueuePurgeOkBody;
import org.apache.qpid.framing.QueueUnbindOkBody;
import org.apache.qpid.framing.ServerChannelMethodProcessor;
import org.apache.qpid.framing.TxCommitOkBody;
import org.apache.qpid.framing.TxRollbackOkBody;
import org.apache.qpid.framing.TxSelectOkBody;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.filter.ArrivalTimeFilter;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.logging.EventLoggerProvider;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.NoFactoryForTypeException;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.UnknownConfiguredObjectException;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.protocol.ConsumerListener;
import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8;
import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod;
import org.apache.qpid.server.protocol.v0_8.ConsumerTagInUseException;
import org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8;
import org.apache.qpid.server.protocol.v0_8.ExtractResendAndRequeue;
import org.apache.qpid.server.protocol.v0_8.IncomingMessage;
import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
import org.apache.qpid.server.protocol.v0_8.MessageOnlyCreditManager;
import org.apache.qpid.server.protocol.v0_8.NoAckCreditManager;
import org.apache.qpid.server.protocol.v0_8.Pre0_10CreditManager;
import org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod;
import org.apache.qpid.server.protocol.v0_8.UnacknowledgedMessageMap;
import org.apache.qpid.server.protocol.v0_8.UnacknowledgedMessageMapImpl;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.MessageHandle;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.ExchangeExistsException;
import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
import org.apache.qpid.server.virtualhost.QueueExistsException;
import org.apache.qpid.server.virtualhost.RequiredExchangeException;
import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.transport.network.Ticker;
import org.apache.qpid.transport.util.Functions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQChannel
implements AMQSessionModel<AMQChannel>,
AsyncAutoCommitTransaction.FutureRecorder,
ServerChannelMethodProcessor {
    public static final int DEFAULT_PREFETCH = 4096;
    private static final Logger _logger = LoggerFactory.getLogger(AMQChannel.class);
    private final DefaultQueueAssociationClearingTask _defaultQueueAssociationClearingTask = new DefaultQueueAssociationClearingTask();
    private final boolean _messageAuthorizationRequired;
    private final int _channelId;
    private final Pre0_10CreditManager _creditManager;
    private final FlowCreditManager _noAckCreditManager;
    private final AccessControlContext _accessControllerContext;
    private long _deliveryTag = 0L;
    private volatile AMQQueue<?> _defaultQueue;
    private int _consumerTag;
    private IncomingMessage _currentMessage;
    private final Map<AMQShortString, ConsumerTarget_0_8> _tag2SubscriptionTargetMap = new HashMap<AMQShortString, ConsumerTarget_0_8>();
    private final List<ConsumerTarget_0_8> _consumersWithPendingWork = new ArrayList<ConsumerTarget_0_8>();
    private final MessageStore _messageStore;
    private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList();
    private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(4096);
    private final AtomicBoolean _suspended = new AtomicBoolean(false);
    private ServerTransaction _transaction;
    private final AtomicLong _txnStarts = new AtomicLong(0L);
    private final AtomicLong _txnCommits = new AtomicLong(0L);
    private final AtomicLong _txnRejects = new AtomicLong(0L);
    private final AtomicLong _txnCount = new AtomicLong(0L);
    private final AMQPConnection_0_8 _connection;
    private AtomicBoolean _closing = new AtomicBoolean(false);
    private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet());
    private final AtomicBoolean _blocking = new AtomicBoolean(false);
    private LogSubject _logSubject;
    private volatile boolean _rollingBack;
    private List<MessageInstance> _resendList = new ArrayList<MessageInstance>();
    private static final AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible.");
    private final ClientDeliveryMethod _clientDeliveryMethod;
    private final TransactionTimeoutHelper _transactionTimeoutHelper;
    private final UUID _id = UUID.randomUUID();
    private final List<Action<? super AMQChannel>> _taskList = new CopyOnWriteArrayList<Action<? super AMQChannel>>();
    private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction();
    private final ImmediateAction _immediateAction = new ImmediateAction();
    private final Subject _subject;
    private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList();
    private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
    private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList();
    private Session<?> _modelObject;
    private long _blockTime;
    private long _blockingTimeout;
    private boolean _confirmOnPublish;
    private long _confirmedMessageCounter;
    private volatile long _uncommittedMessageSize;
    private final List<StoredMessage<MessageMetaData>> _uncommittedMessages = new ArrayList<StoredMessage<MessageMetaData>>();
    private long _maxUncommittedInMemorySize;
    private boolean _wireBlockingState;
    private boolean _prefetchLoggedForChannel = false;
    private boolean _logChannelFlowMessages = true;
    private final String id = "(" + System.identityHashCode(this) + ")";
    private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod(){

        @Override
        public void recordMessageDelivery(ConsumerImpl sub, MessageInstance entry, long deliveryTag) {
            AMQChannel.this.addUnacknowledgedMessage(entry, deliveryTag, sub);
        }
    };

    public AMQChannel(AMQPConnection_0_8 connection, int channelId, MessageStore messageStore) {
        this._creditManager = new Pre0_10CreditManager(0L, 0L, (ProtocolEngine)connection);
        this._noAckCreditManager = new NoAckCreditManager((ProtocolEngine)connection);
        this._connection = connection;
        this._channelId = channelId;
        this._subject = new Subject(false, connection.getAuthorizedSubject().getPrincipals(), connection.getAuthorizedSubject().getPublicCredentials(), connection.getAuthorizedSubject().getPrivateCredentials());
        this._subject.getPrincipals().add((Principal)new SessionPrincipal((AMQSessionModel)this));
        this._accessControllerContext = SecurityManager.getAccessControlContextFromSubject((Subject)this._subject);
        this._maxUncommittedInMemorySize = (Long)connection.getVirtualHost().getContextValue(Long.class, "connection.maxUncommittedInMemorySize");
        this._messageAuthorizationRequired = (Boolean)connection.getVirtualHost().getContextValue(Boolean.class, "qpid.broker_msg_auth");
        this._logSubject = new ChannelLogSubject((AMQSessionModel)this);
        this._messageStore = messageStore;
        this._blockingTimeout = (Long)connection.getBroker().getContextValue(Long.class, "channel.flowControlEnforcementTimeout");
        this._transaction = new AsyncAutoCommitTransaction(this._messageStore, (AsyncAutoCommitTransaction.FutureRecorder)this);
        this._clientDeliveryMethod = connection.createDeliveryMethod(this._channelId);
        this._transactionTimeoutHelper = new TransactionTimeoutHelper(this._logSubject, new TransactionTimeoutHelper.CloseAction(){

            public void doTimeoutAction(String reason) {
                AMQChannel.this._connection.sendConnectionCloseAsync(AMQConstant.RESOURCE_ERROR, reason);
            }
        }, (EventLoggerProvider)this.getVirtualHost());
        AccessController.doPrivileged(new PrivilegedAction<Object>(){

            @Override
            public Object run() {
                AMQChannel.this.getVirtualHost().getEventLogger().message(ChannelMessages.CREATE());
                return null;
            }
        }, this._accessControllerContext);
    }

    public AccessControlContext getAccessControllerContext() {
        return this._accessControllerContext;
    }

    private boolean performGet(MessageSource queue, boolean acks) throws MessageSource.ExistingConsumerPreventsExclusive, MessageSource.ExistingExclusiveConsumer, MessageSource.ConsumerAccessRefused {
        MessageOnlyCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L);
        GetDeliveryMethod getDeliveryMethod = new GetDeliveryMethod(singleMessageCredit, queue);
        RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod(){

            @Override
            public void recordMessageDelivery(ConsumerImpl sub, MessageInstance entry, long deliveryTag) {
                AMQChannel.this.addUnacknowledgedMessage(entry, deliveryTag, null);
            }
        };
        EnumSet<ConsumerImpl.Option> options = EnumSet.of(ConsumerImpl.Option.TRANSIENT, ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES);
        ConsumerTarget_0_8 target = acks ? ConsumerTarget_0_8.createAckTarget(this, AMQShortString.EMPTY_STRING, null, singleMessageCredit, getDeliveryMethod, getRecordMethod) : ConsumerTarget_0_8.createGetNoAckTarget(this, AMQShortString.EMPTY_STRING, null, singleMessageCredit, getDeliveryMethod, getRecordMethod);
        ConsumerImpl sub = queue.addConsumer((ConsumerTarget)target, null, AMQMessage.class, "", options);
        sub.flush();
        sub.close();
        return getDeliveryMethod.hasDeliveredMessage();
    }

    public void setLocalTransactional() {
        this._transaction = new LocalTransaction(this._messageStore, new LocalTransaction.ActivityTimeAccessor(){

            public long getActivityTime() {
                return AMQChannel.this._connection.getLastReadTime();
            }
        });
        this._txnStarts.incrementAndGet();
    }

    public boolean isTransactional() {
        return this._transaction.isTransactional();
    }

    public void receivedComplete() {
        this.sync();
    }

    private void incrementOutstandingTxnsIfNecessary() {
        if (this.isTransactional()) {
            this._txnCount.compareAndSet(0L, 1L);
        }
    }

    private void decrementOutstandingTxnsIfNecessary() {
        if (this.isTransactional()) {
            this._txnCount.compareAndSet(1L, 0L);
        }
    }

    public Long getTxnCommits() {
        return this._txnCommits.get();
    }

    public Long getTxnRejects() {
        return this._txnRejects.get();
    }

    public Long getTxnCount() {
        return this._txnCount.get();
    }

    public Long getTxnStart() {
        return this._txnStarts.get();
    }

    public int getChannelId() {
        return this._channelId;
    }

    public void setPublishFrame(MessagePublishInfo info, MessageDestination e) {
        String routingKey = AMQShortString.toString((AMQShortString)info.getRoutingKey());
        VirtualHostImpl virtualHost = this.getVirtualHost();
        SecurityManager securityManager = virtualHost.getSecurityManager();
        securityManager.authorisePublish(info.isImmediate(), routingKey, e.getName(), virtualHost.getName(), this._subject);
        this._currentMessage = new IncomingMessage(info);
        this._currentMessage.setMessageDestination(e);
    }

    public void publishContentHeader(ContentHeaderBody contentHeaderBody) {
        if (_logger.isDebugEnabled()) {
            _logger.debug("Content header received on channel " + this._channelId);
        }
        this._currentMessage.setContentHeaderBody(contentHeaderBody);
        this.deliverCurrentMessageIfComplete();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void deliverCurrentMessageIfComplete() {
        if (this._currentMessage.allContentReceived()) {
            if (this._confirmOnPublish) {
                ++this._confirmedMessageCounter;
            }
            Runnable finallyAction = null;
            ContentHeaderBody contentHeader = this._currentMessage.getContentHeader();
            long bodySize = this._currentMessage.getSize();
            long timestamp = contentHeader.getProperties().getTimestamp();
            try {
                final MessagePublishInfo messagePublishInfo = this._currentMessage.getMessagePublishInfo();
                MessageDestination destination = this._currentMessage.getDestination();
                MessageMetaData messageMetaData = new MessageMetaData(messagePublishInfo, contentHeader, this.getConnection().getLastReadTime());
                MessageHandle handle = this._messageStore.addMessage((StorableMessageMetaData)messageMetaData);
                int bodyCount = this._currentMessage.getBodyCount();
                if (bodyCount > 0) {
                    long bodyLengthReceived = 0L;
                    for (int i = 0; i < bodyCount; ++i) {
                        ContentBody contentChunk = this._currentMessage.getContentChunk(i);
                        handle.addContent(contentChunk.getPayload());
                        bodyLengthReceived += (long)contentChunk.getSize();
                        contentChunk.dispose();
                    }
                }
                StoredMessage storedMessage = handle.allContentAdded();
                final AMQMessage amqMessage = this.createAMQMessage((StoredMessage<MessageMetaData>)storedMessage);
                MessageReference reference = amqMessage.newReference();
                try {
                    this._currentMessage = null;
                    if (!this.checkMessageUserId(contentHeader)) {
                        if (this._confirmOnPublish) {
                            this._connection.writeFrame((AMQDataBlock)new AMQFrame(this._channelId, (AMQBody)new BasicNackBody(this._confirmedMessageCounter, false, false)));
                        }
                        this._transaction.addPostTransactionAction((ServerTransaction.Action)new WriteReturnAction(AMQConstant.ACCESS_REFUSED, "Access Refused", amqMessage));
                    } else {
                        final boolean immediate = messagePublishInfo.isImmediate();
                        InstanceProperties instanceProperties = new InstanceProperties(){

                            public Object getProperty(InstanceProperties.Property prop) {
                                switch (prop) {
                                    case EXPIRATION: {
                                        return amqMessage.getExpiration();
                                    }
                                    case IMMEDIATE: {
                                        return immediate;
                                    }
                                    case PERSISTENT: {
                                        return amqMessage.isPersistent();
                                    }
                                    case MANDATORY: {
                                        return messagePublishInfo.isMandatory();
                                    }
                                    case REDELIVERED: {
                                        return false;
                                    }
                                }
                                return null;
                            }
                        };
                        int enqueues = destination.send((ServerMessage)amqMessage, amqMessage.getInitialRoutingAddress(), instanceProperties, this._transaction, (Action)(immediate ? this._immediateAction : this._capacityCheckAction));
                        if (enqueues == 0) {
                            finallyAction = this.handleUnroutableMessage(amqMessage);
                        } else {
                            if (this._confirmOnPublish) {
                                BasicAckBody responseBody = this._connection.getMethodRegistry().createBasicAckBody(this._confirmedMessageCounter, false);
                                this._connection.writeFrame((AMQDataBlock)responseBody.generateFrame(this._channelId));
                            }
                            this.incrementUncommittedMessageSize((StoredMessage<MessageMetaData>)storedMessage);
                            this.incrementOutstandingTxnsIfNecessary();
                        }
                    }
                }
                finally {
                    reference.release();
                    if (finallyAction != null) {
                        finallyAction.run();
                    }
                }
            }
            finally {
                this._connection.registerMessageReceived(bodySize, timestamp);
                this._currentMessage = null;
            }
        }
    }

    private void incrementUncommittedMessageSize(StoredMessage<MessageMetaData> handle) {
        if (this.isTransactional()) {
            this._uncommittedMessageSize += (long)((MessageMetaData)handle.getMetaData()).getContentSize();
            if (this._uncommittedMessageSize > this.getMaxUncommittedInMemorySize()) {
                handle.flowToDisk();
                if (!this._uncommittedMessages.isEmpty() || this._uncommittedMessageSize == (long)((MessageMetaData)handle.getMetaData()).getContentSize()) {
                    this.getVirtualHost().getEventLogger().message(this._logSubject, ChannelMessages.LARGE_TRANSACTION_WARN((Number)this._uncommittedMessageSize));
                }
                if (!this._uncommittedMessages.isEmpty()) {
                    for (StoredMessage<MessageMetaData> uncommittedHandle : this._uncommittedMessages) {
                        uncommittedHandle.flowToDisk();
                    }
                    this._uncommittedMessages.clear();
                }
            } else {
                this._uncommittedMessages.add(handle);
            }
        }
    }

    private Runnable handleUnroutableMessage(AMQMessage message) {
        boolean mandatory = message.isMandatory();
        String exchangeName = AMQShortString.toString((AMQShortString)message.getMessagePublishInfo().getExchange());
        String routingKey = AMQShortString.toString((AMQShortString)message.getMessagePublishInfo().getRoutingKey());
        final String description = String.format("[Exchange: %s, Routing key: %s]", exchangeName, routingKey);
        boolean closeOnNoRoute = this._connection.isCloseWhenNoRoute();
        Runnable returnVal = null;
        if (_logger.isDebugEnabled()) {
            _logger.debug(String.format("Unroutable message %s, mandatory=%s, transactionalSession=%s, closeOnNoRoute=%s", description, mandatory, this.isTransactional(), closeOnNoRoute));
        }
        if (mandatory && this.isTransactional() && !this._confirmOnPublish && this._connection.isCloseWhenNoRoute()) {
            returnVal = new Runnable(){

                @Override
                public void run() {
                    AMQChannel.this._connection.sendConnectionClose(AMQConstant.NO_ROUTE, "No route for message " + description, AMQChannel.this._channelId);
                }
            };
        } else if (mandatory || message.isImmediate()) {
            if (this._confirmOnPublish) {
                this._connection.writeFrame((AMQDataBlock)new AMQFrame(this._channelId, (AMQBody)new BasicNackBody(this._confirmedMessageCounter, false, false)));
            }
            this._transaction.addPostTransactionAction((ServerTransaction.Action)new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message " + description, message));
        } else {
            this.getVirtualHost().getEventLogger().message(ExchangeMessages.DISCARDMSG((String)exchangeName, (String)routingKey));
        }
        return returnVal;
    }

    public void publishContentBody(ContentBody contentBody) {
        if (_logger.isDebugEnabled()) {
            _logger.debug(this.debugIdentity() + " content body received on channel " + this._channelId);
        }
        try {
            long currentSize = this._currentMessage.addContentBodyFrame(contentBody);
            if (currentSize > this._currentMessage.getSize()) {
                this._connection.sendConnectionClose(AMQConstant.FRAME_ERROR, "More message data received than content header defined", this._channelId);
            } else {
                this.deliverCurrentMessageIfComplete();
            }
        }
        catch (RuntimeException e) {
            this._currentMessage = null;
            throw e;
        }
    }

    public long getNextDeliveryTag() {
        return ++this._deliveryTag;
    }

    public int getNextConsumerTag() {
        return ++this._consumerTag;
    }

    public ConsumerTarget getSubscription(AMQShortString tag) {
        return (ConsumerTarget)this._tag2SubscriptionTargetMap.get(tag);
    }

    public AMQShortString consumeFromSource(AMQShortString tag, Collection<MessageSource> sources, boolean acks, FieldTable arguments, boolean exclusive, boolean noLocal) throws MessageSource.ExistingConsumerPreventsExclusive, MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException, MessageSource.ConsumerAccessRefused, ConsumerTagInUseException {
        ConsumerTarget_0_8 target;
        if (tag == null) {
            tag = new AMQShortString("sgen_" + this.getNextConsumerTag());
        }
        if (this._tag2SubscriptionTargetMap.containsKey(tag)) {
            throw new ConsumerTagInUseException("Consumer already exists with same tag: " + tag);
        }
        EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class);
        if (arguments != null && Boolean.TRUE.equals(arguments.get(AMQPFilterTypes.NO_CONSUME.getValue()))) {
            target = ConsumerTarget_0_8.createBrowserTarget(this, tag, arguments, this._noAckCreditManager);
        } else if (acks) {
            target = ConsumerTarget_0_8.createAckTarget(this, tag, arguments, this._creditManager);
            options.add(ConsumerImpl.Option.ACQUIRES);
            options.add(ConsumerImpl.Option.SEES_REQUEUES);
        } else {
            target = ConsumerTarget_0_8.createNoAckTarget(this, tag, arguments, this._noAckCreditManager);
            options.add(ConsumerImpl.Option.ACQUIRES);
            options.add(ConsumerImpl.Option.SEES_REQUEUES);
        }
        if (exclusive) {
            options.add(ConsumerImpl.Option.EXCLUSIVE);
        }
        this._tag2SubscriptionTargetMap.put(tag, target);
        try {
            FilterManager filterManager = FilterManagerFactory.createManager((Map)FieldTable.convertToMap((FieldTable)arguments));
            if (noLocal) {
                if (filterManager == null) {
                    filterManager = new FilterManager();
                }
                NoLocalFilter filter = new NoLocalFilter();
                filterManager.add(filter.getName(), (MessageFilter)filter);
            }
            if (arguments != null && arguments.containsKey(AMQPFilterTypes.REPLAY_PERIOD.toString())) {
                long period;
                Object value = arguments.get(AMQPFilterTypes.REPLAY_PERIOD.toString());
                if (value instanceof Number) {
                    period = ((Number)value).longValue();
                } else if (value instanceof String) {
                    try {
                        period = Long.parseLong(value.toString());
                    }
                    catch (NumberFormatException e) {
                        throw new AMQInvalidArgumentException("Cannot parse value " + value + " as a number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString());
                    }
                } else {
                    throw new AMQInvalidArgumentException("Cannot parse value " + value + " as a number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString());
                }
                long startingFrom = System.currentTimeMillis() - 1000L * period;
                if (filterManager == null) {
                    filterManager = new FilterManager();
                }
                ArrivalTimeFilter filter = new ArrivalTimeFilter(startingFrom, period == 0L);
                filterManager.add(filter.getName(), (MessageFilter)filter);
            }
            for (MessageSource source : sources) {
                ConsumerImpl sub = source.addConsumer((ConsumerTarget)target, filterManager, AMQMessage.class, AMQShortString.toString((AMQShortString)tag), options);
                if (!(sub instanceof Consumer)) continue;
                Consumer modelConsumer = (Consumer)sub;
                this.consumerAdded(modelConsumer);
                modelConsumer.addChangeListener(this._consumerClosedListener);
                this._consumers.add(modelConsumer);
            }
        }
        catch (AccessControlException | AMQInvalidArgumentException | MessageSource.ConsumerAccessRefused | MessageSource.ExistingConsumerPreventsExclusive | MessageSource.ExistingExclusiveConsumer e) {
            this._tag2SubscriptionTargetMap.remove(tag);
            throw e;
        }
        return tag;
    }

    public boolean unsubscribeConsumer(AMQShortString consumerTag) {
        ConsumerTarget_0_8 target;
        List<ConsumerImpl> subs;
        if (_logger.isDebugEnabled()) {
            _logger.debug("Unsubscribing consumer '{}' on channel {}", (Object)consumerTag, (Object)this);
        }
        List<ConsumerImpl> list = subs = (target = this._tag2SubscriptionTargetMap.remove(consumerTag)) == null ? null : target.getConsumers();
        if (subs != null) {
            for (ConsumerImpl sub : subs) {
                sub.close();
                if (!(sub instanceof Consumer)) continue;
                this._consumers.remove(sub);
            }
            return true;
        }
        _logger.warn("Attempt to unsubscribe consumer with tag '" + consumerTag + "' which is not registered.");
        return false;
    }

    public void close() {
        this.close(null, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close(AMQConstant cause, String message) {
        LogMessage logMessage;
        if (!this._closing.compareAndSet(false, true)) {
            return;
        }
        try {
            this.unsubscribeAllConsumers();
            this.setDefaultQueue(null);
            if (this._modelObject != null) {
                this._modelObject.delete();
            }
            for (Action<? super AMQChannel> task : this._taskList) {
                task.performAction((Object)this);
            }
            this._transaction.rollback();
            this.requeue();
            logMessage = cause == null ? ChannelMessages.CLOSE() : ChannelMessages.CLOSE_FORCED((Number)cause.getCode(), (String)message);
        }
        catch (Throwable throwable) {
            LogMessage operationalLogMessage = cause == null ? ChannelMessages.CLOSE() : ChannelMessages.CLOSE_FORCED((Number)cause.getCode(), (String)message);
            this.getVirtualHost().getEventLogger().message(this._logSubject, operationalLogMessage);
            throw throwable;
        }
        LogMessage operationalLogMessage = logMessage;
        this.getVirtualHost().getEventLogger().message(this._logSubject, operationalLogMessage);
    }

    private void unsubscribeAllConsumers() {
        if (_logger.isDebugEnabled()) {
            if (!this._tag2SubscriptionTargetMap.isEmpty()) {
                _logger.debug("Unsubscribing all consumers on channel " + this.toString());
            } else {
                _logger.debug("No consumers to unsubscribe on channel " + this.toString());
            }
        }
        HashSet<AMQShortString> subscriptionTags = new HashSet<AMQShortString>(this._tag2SubscriptionTargetMap.keySet());
        for (AMQShortString tag : subscriptionTags) {
            this.unsubscribeConsumer(tag);
        }
    }

    public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, ConsumerImpl consumer) {
        if (_logger.isDebugEnabled()) {
            _logger.debug(this.debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag + ") for " + consumer + " on " + entry.getOwningResource().getName());
        }
        this._unacknowledgedMessageMap.add(deliveryTag, entry);
    }

    public String debugIdentity() {
        return this._channelId + this.id;
    }

    private void requeue() {
        Collection<MessageInstance> messagesToBeDelivered = this._unacknowledgedMessageMap.cancelAllMessages();
        if (!messagesToBeDelivered.isEmpty() && _logger.isDebugEnabled()) {
            _logger.debug("Requeuing " + messagesToBeDelivered.size() + " unacked messages. for " + this.toString());
        }
        for (MessageInstance unacked : messagesToBeDelivered) {
            unacked.setRedelivered();
            unacked.release(unacked.getAcquiringConsumer());
        }
    }

    public void requeue(long deliveryTag) {
        MessageInstance unacked = this._unacknowledgedMessageMap.remove(deliveryTag);
        if (unacked != null) {
            unacked.setRedelivered();
            unacked.release(unacked.getAcquiringConsumer());
        } else {
            _logger.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists." + this._unacknowledgedMessageMap.size());
        }
    }

    public boolean isMaxDeliveryCountEnabled(long deliveryTag) {
        MessageInstance queueEntry = this._unacknowledgedMessageMap.get(deliveryTag);
        if (queueEntry != null) {
            int maximumDeliveryCount = queueEntry.getMaximumDeliveryCount();
            return maximumDeliveryCount > 0;
        }
        return false;
    }

    public boolean isDeliveredTooManyTimes(long deliveryTag) {
        MessageInstance queueEntry = this._unacknowledgedMessageMap.get(deliveryTag);
        if (queueEntry != null) {
            int maximumDeliveryCount = queueEntry.getMaximumDeliveryCount();
            int numDeliveries = queueEntry.getDeliveryCount();
            return maximumDeliveryCount != 0 && numDeliveries >= maximumDeliveryCount;
        }
        return false;
    }

    private void resend() {
        long deliveryTag;
        MessageInstance message;
        LinkedHashMap<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>();
        LinkedHashMap<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>();
        if (_logger.isDebugEnabled()) {
            _logger.debug("unacked map Size:" + this._unacknowledgedMessageMap.size());
        }
        this._unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(this._unacknowledgedMessageMap, msgToRequeue, msgToResend));
        if (_logger.isDebugEnabled()) {
            if (!msgToResend.isEmpty()) {
                _logger.debug("Preparing (" + msgToResend.size() + ") message to resend.");
            } else {
                _logger.debug("No message to resend.");
            }
        }
        for (Map.Entry entry : msgToResend.entrySet()) {
            message = (MessageInstance)entry.getValue();
            deliveryTag = (Long)entry.getKey();
            message.decrementDeliveryCount();
            message.setRedelivered();
            if (message.resend()) continue;
            msgToRequeue.put(deliveryTag, message);
        }
        if (_logger.isDebugEnabled() && !msgToRequeue.isEmpty()) {
            _logger.debug("Preparing (" + msgToRequeue.size() + ") message to requeue");
        }
        for (Map.Entry entry : msgToRequeue.entrySet()) {
            message = (MessageInstance)entry.getValue();
            deliveryTag = (Long)entry.getKey();
            message.decrementDeliveryCount();
            this._unacknowledgedMessageMap.remove(deliveryTag);
            message.setRedelivered();
            message.release(message.getAcquiringConsumer());
        }
    }

    private void acknowledgeMessage(long deliveryTag, boolean multiple) {
        Collection<MessageInstance> ackedMessages = this.getAckedMessages(deliveryTag, multiple);
        this._transaction.dequeue(ackedMessages, (ServerTransaction.Action)new MessageAcknowledgeAction(ackedMessages));
    }

    private Collection<MessageInstance> getAckedMessages(long deliveryTag, boolean multiple) {
        return this._unacknowledgedMessageMap.acknowledge(deliveryTag, multiple);
    }

    public UnacknowledgedMessageMap getUnacknowledgedMessageMap() {
        return this._unacknowledgedMessageMap;
    }

    public void setSuspended(boolean suspended) {
        boolean wasSuspended = this._suspended.getAndSet(suspended);
        if (wasSuspended != suspended) {
            if (!suspended && this._logChannelFlowMessages) {
                this.getVirtualHost().getEventLogger().message(this._logSubject, ChannelMessages.FLOW((String)"Started"));
            }
            if (wasSuspended) {
                for (ConsumerTarget_0_8 s : this.getConsumerTargets()) {
                    for (ConsumerImpl sub : s.getConsumers()) {
                        sub.externalStateChange();
                    }
                }
            }
            if (!wasSuspended) {
                this.ensureConsumersNoticedStateChange();
            }
            if (suspended && this._logChannelFlowMessages) {
                this.getVirtualHost().getEventLogger().message(this._logSubject, ChannelMessages.FLOW((String)"Stopped"));
            }
        }
    }

    public boolean isSuspended() {
        return this._suspended.get() || this._closing.get() || this._connection.isClosing();
    }

    public void commit(final Runnable immediateAction, boolean async) {
        if (async && this._transaction instanceof LocalTransaction) {
            ((LocalTransaction)this._transaction).commitAsync(new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    try {
                        immediateAction.run();
                    }
                    finally {
                        AMQChannel.this._txnCommits.incrementAndGet();
                        AMQChannel.this._txnStarts.incrementAndGet();
                        AMQChannel.this.decrementOutstandingTxnsIfNecessary();
                    }
                }
            });
        } else {
            this._transaction.commit(immediateAction);
            this._txnCommits.incrementAndGet();
            this._txnStarts.incrementAndGet();
            this.decrementOutstandingTxnsIfNecessary();
        }
        this.resetUncommittedMessages();
    }

    private void resetUncommittedMessages() {
        this._uncommittedMessageSize = 0L;
        this._uncommittedMessages.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void rollback(Runnable postRollbackTask) {
        this._rollingBack = true;
        boolean requiresSuspend = this._suspended.compareAndSet(false, true);
        this.ensureConsumersNoticedStateChange();
        try {
            this._transaction.rollback();
        }
        finally {
            this._rollingBack = false;
            this._txnRejects.incrementAndGet();
            this._txnStarts.incrementAndGet();
            this.decrementOutstandingTxnsIfNecessary();
            this.resetUncommittedMessages();
        }
        postRollbackTask.run();
        for (MessageInstance entry : this._resendList) {
            ConsumerImpl sub = entry.getAcquiringConsumer();
            if (sub == null || sub.isClosed()) {
                entry.release(sub);
                continue;
            }
            entry.resend();
        }
        this._resendList.clear();
        if (requiresSuspend) {
            this._suspended.set(false);
            for (ConsumerTarget_0_8 target : this.getConsumerTargets()) {
                for (ConsumerImpl sub : target.getConsumers()) {
                    sub.externalStateChange();
                }
            }
        }
    }

    public String toString() {
        return "(" + this._suspended.get() + ", " + this._closing.get() + ", " + this._connection.isClosing() + ") " + "[" + this._connection.toString() + ":" + this._channelId + "]";
    }

    public boolean isClosing() {
        return this._closing.get();
    }

    public AMQPConnection_0_8 getConnection() {
        return this._connection;
    }

    public void setCredit(long prefetchSize, int prefetchCount) {
        if (!this._prefetchLoggedForChannel) {
            this.getVirtualHost().getEventLogger().message(ChannelMessages.PREFETCH_SIZE((Number)prefetchSize, (Number)prefetchCount));
            this._prefetchLoggedForChannel = true;
        }
        if (prefetchCount <= 1 && prefetchSize == 0L) {
            this._logChannelFlowMessages = false;
        }
        this._creditManager.setCreditLimits(prefetchSize, prefetchCount);
    }

    public MessageStore getMessageStore() {
        return this._messageStore;
    }

    public ClientDeliveryMethod getClientDeliveryMethod() {
        return this._clientDeliveryMethod;
    }

    public RecordDeliveryMethod getRecordDeliveryMethod() {
        return this._recordDeliveryMethod;
    }

    private AMQMessage createAMQMessage(StoredMessage<MessageMetaData> handle) {
        AMQMessage message = new AMQMessage(handle, this._connection.getReference());
        return message;
    }

    private boolean checkMessageUserId(ContentHeaderBody header) {
        AMQShortString userID = header.getProperties().getUserId();
        return !this._messageAuthorizationRequired || this._connection.getAuthorizedPrincipal().getName().equals(userID == null ? "" : userID.toString());
    }

    public UUID getId() {
        return this._id;
    }

    public AMQPConnection<?> getAMQPConnection() {
        return this._connection;
    }

    public String getClientID() {
        return this._connection.getClientId();
    }

    public LogSubject getLogSubject() {
        return this._logSubject;
    }

    public int compareTo(AMQSessionModel o) {
        return this.getId().compareTo(o.getId());
    }

    public void addDeleteTask(Action<? super AMQChannel> task) {
        this._taskList.add(task);
    }

    public void removeDeleteTask(Action<? super AMQChannel> task) {
        this._taskList.remove(task);
    }

    public Subject getSubject() {
        return this._subject;
    }

    public boolean hasCurrentMessage() {
        return this._currentMessage != null;
    }

    public long getMaxUncommittedInMemorySize() {
        return this._maxUncommittedInMemorySize;
    }

    public synchronized void block() {
        if (this._blockingEntities.add(this) && this._blocking.compareAndSet(false, true)) {
            this.getVirtualHost().getEventLogger().message(this._logSubject, ChannelMessages.FLOW_ENFORCED((String)"** All Queues **"));
            this.getConnection().notifyWork();
        }
    }

    public synchronized void unblock() {
        if (this._blockingEntities.remove(this) && this._blockingEntities.isEmpty() && this._blocking.compareAndSet(true, false)) {
            this.getVirtualHost().getEventLogger().message(this._logSubject, ChannelMessages.FLOW_REMOVED());
            this.getConnection().notifyWork();
        }
    }

    public synchronized void block(AMQQueue queue) {
        if (this._blockingEntities.add(queue) && this._blocking.compareAndSet(false, true)) {
            this.getVirtualHost().getEventLogger().message(this._logSubject, ChannelMessages.FLOW_ENFORCED((String)queue.getName()));
            this.getConnection().notifyWork();
        }
    }

    public synchronized void unblock(AMQQueue queue) {
        if (this._blockingEntities.remove(queue) && this._blockingEntities.isEmpty() && this._blocking.compareAndSet(true, false) && !this.isClosing()) {
            this.getVirtualHost().getEventLogger().message(this._logSubject, ChannelMessages.FLOW_REMOVED());
            this.getConnection().notifyWork();
        }
    }

    public void transportStateChanged() {
        this._creditManager.restoreCredit(0L, 0L);
        this._noAckCreditManager.restoreCredit(0L, 0L);
    }

    public Object getConnectionReference() {
        return this.getConnection().getReference();
    }

    public int getUnacknowledgedMessageCount() {
        return this.getUnacknowledgedMessageMap().size();
    }

    private void flow(boolean flow) {
        MethodRegistry methodRegistry = this._connection.getMethodRegistry();
        ChannelFlowBody responseBody = methodRegistry.createChannelFlowBody(flow);
        this._connection.writeFrame((AMQDataBlock)responseBody.generateFrame(this._channelId));
    }

    public boolean getBlocking() {
        return this._blocking.get();
    }

    public VirtualHostImpl getVirtualHost() {
        return (VirtualHostImpl)this.getConnection().getVirtualHost();
    }

    public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) {
        this._transactionTimeoutHelper.checkIdleOrOpenTimes(this._transaction, openWarn, openClose, idleWarn, idleClose);
    }

    private void deadLetter(long deliveryTag) {
        UnacknowledgedMessageMap unackedMap = this.getUnacknowledgedMessageMap();
        MessageInstance rejectedQueueEntry = unackedMap.remove(deliveryTag);
        if (rejectedQueueEntry == null) {
            _logger.warn("No message found, unable to DLQ delivery tag: " + deliveryTag);
        } else {
            TransactionLogResource owningResource;
            final ServerMessage msg = rejectedQueueEntry.getMessage();
            int requeues = 0;
            if (rejectedQueueEntry.makeAcquisitionUnstealable(rejectedQueueEntry.getAcquiringConsumer())) {
                requeues = rejectedQueueEntry.routeToAlternate((Action)new Action<MessageInstance>(){

                    public void performAction(MessageInstance requeueEntry) {
                        AMQChannel.this.getVirtualHost().getEventLogger().message(AMQChannel.this._logSubject, ChannelMessages.DEADLETTERMSG((Number)msg.getMessageNumber(), (String)requeueEntry.getOwningResource().getName()));
                    }
                }, null);
            }
            if (requeues == 0 && (owningResource = rejectedQueueEntry.getOwningResource()) instanceof AMQQueue) {
                AMQQueue queue = (AMQQueue)owningResource;
                Exchange altExchange = queue.getAlternateExchange();
                if (altExchange == null) {
                    if (_logger.isDebugEnabled()) {
                        _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
                    }
                    this.getVirtualHost().getEventLogger().message(this._logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH((Number)msg.getMessageNumber(), (String)queue.getName(), (String)msg.getInitialRoutingAddress()));
                } else {
                    if (_logger.isDebugEnabled()) {
                        _logger.debug("Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + deliveryTag);
                    }
                    this.getVirtualHost().getEventLogger().message(this._logSubject, ChannelMessages.DISCARDMSG_NOROUTE((Number)msg.getMessageNumber(), (String)altExchange.getName()));
                }
            }
        }
    }

    public void recordFuture(ListenableFuture<Void> future, ServerTransaction.Action action) {
        this._unfinishedCommandsQueue.add(new AsyncCommand(future, action));
    }

    public void sync() {
        AsyncCommand cmd;
        if (_logger.isDebugEnabled()) {
            _logger.debug("sync() called on channel " + this.debugIdentity());
        }
        while ((cmd = this._unfinishedCommandsQueue.poll()) != null) {
            cmd.complete();
        }
        if (this._transaction instanceof LocalTransaction) {
            ((LocalTransaction)this._transaction).sync();
        }
    }

    public int getConsumerCount() {
        return this._tag2SubscriptionTargetMap.size();
    }

    public Collection<Consumer<?>> getConsumers() {
        return Collections.unmodifiableCollection(this._consumers);
    }

    private void consumerAdded(Consumer<?> consumer) {
        for (ConsumerListener l : this._consumerListeners) {
            l.consumerAdded(consumer);
        }
    }

    private void consumerRemoved(Consumer<?> consumer) {
        for (ConsumerListener l : this._consumerListeners) {
            l.consumerRemoved(consumer);
        }
    }

    public void addConsumerListener(ConsumerListener listener) {
        this._consumerListeners.add(listener);
    }

    public void removeConsumerListener(ConsumerListener listener) {
        this._consumerListeners.remove(listener);
    }

    public void setModelObject(Session<?> session) {
        this._modelObject = session;
    }

    public Session<?> getModelObject() {
        return this._modelObject;
    }

    public long getTransactionStartTime() {
        ServerTransaction serverTransaction = this._transaction;
        if (serverTransaction.isTransactional()) {
            return serverTransaction.getTransactionStartTime();
        }
        return 0L;
    }

    public long getTransactionUpdateTime() {
        ServerTransaction serverTransaction = this._transaction;
        if (serverTransaction.isTransactional()) {
            return serverTransaction.getTransactionUpdateTime();
        }
        return 0L;
    }

    public void receiveAccessRequest(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write, boolean read) {
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV[" + this._channelId + "] AccessRequest[" + " realm: " + realm + " exclusive: " + exclusive + " passive: " + passive + " active: " + active + " write: " + write + " read: " + read + " ]");
        }
        MethodRegistry methodRegistry = this._connection.getMethodRegistry();
        if (ProtocolVersion.v0_91.equals((Object)this._connection.getProtocolVersion())) {
            this._connection.sendConnectionClose(AMQConstant.COMMAND_INVALID, "AccessRequest not present in AMQP versions other than 0-8, 0-9", this._channelId);
        } else {
            AccessRequestOkBody response = methodRegistry.createAccessRequestOkBody(0);
            this.sync();
            this._connection.writeFrame((AMQDataBlock)response.generateFrame(this._channelId));
        }
    }

    public void receiveBasicAck(long deliveryTag, boolean multiple) {
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV[" + this._channelId + "] BasicAck[" + " deliveryTag: " + deliveryTag + " multiple: " + multiple + " ]");
        }
        this.acknowledgeMessage(deliveryTag, multiple);
    }

    public void receiveBasicCancel(AMQShortString consumerTag, boolean nowait) {
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV[" + this._channelId + "] BasicCancel[" + " consumerTag: " + consumerTag + " noWait: " + nowait + " ]");
        }
        this.unsubscribeConsumer(consumerTag);
        if (!nowait) {
            MethodRegistry methodRegistry = this._connection.getMethodRegistry();
            BasicCancelOkBody cancelOkBody = methodRegistry.createBasicCancelOkBody(consumerTag);
            this.sync();
            this._connection.writeFrame((AMQDataBlock)cancelOkBody.generateFrame(this._channelId));
        }
    }

    public void receiveBasicConsume(AMQShortString queue, AMQShortString consumerTag, boolean noLocal, boolean noAck, boolean exclusive, boolean nowait, FieldTable arguments) {
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV[" + this._channelId + "] BasicConsume[" + " queue: " + queue + " consumerTag: " + consumerTag + " noLocal: " + noLocal + " noAck: " + noAck + " exclusive: " + exclusive + " nowait: " + nowait + " arguments: " + arguments + " ]");
        }
        AMQShortString consumerTag1 = consumerTag;
        VirtualHostImpl vHost = this.getVirtualHost();
        this.sync();
        String queueName = AMQShortString.toString((AMQShortString)queue);
        AMQQueue queue1 = queueName == null ? this.getDefaultQueue() : vHost.getAttainedMessageSource(queueName);
        HashSet<MessageSource> sources = new HashSet<MessageSource>();
        if (queue1 != null) {
            sources.add((MessageSource)queue1);
        } else if (((Boolean)vHost.getContextValue(Boolean.class, "qpid.enableMultiQueueConsumers")).booleanValue() && arguments != null && arguments.get("x-multiqueue") instanceof Collection) {
            for (Object object : (Collection)arguments.get("x-multiqueue")) {
                String sourceName = String.valueOf(object);
                if ((sourceName = sourceName.trim()).length() == 0) continue;
                MessageSource source = vHost.getAttainedMessageSource(sourceName);
                if (source == null) {
                    sources.clear();
                    break;
                }
                sources.add(source);
            }
            queueName = arguments.get("x-multiqueue").toString();
        }
        if (sources.isEmpty()) {
            if (_logger.isDebugEnabled()) {
                _logger.debug("No queue for '" + queueName + "'");
            }
            if (queueName != null) {
                this.closeChannel(AMQConstant.NOT_FOUND, "No such queue, '" + queueName + "'");
            } else {
                this._connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "No queue name provided, no default queue defined.", this._channelId);
            }
        } else {
            try {
                consumerTag1 = this.consumeFromSource(consumerTag1, sources, !noAck, arguments, exclusive, noLocal);
                if (!nowait) {
                    MethodRegistry methodRegistry = this._connection.getMethodRegistry();
                    BasicConsumeOkBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag1);
                    this._connection.writeFrame((AMQDataBlock)responseBody.generateFrame(this._channelId));
                }
            }
            catch (ConsumerTagInUseException cte) {
                this._connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "Non-unique consumer tag, '" + consumerTag1 + "'", this._channelId);
            }
            catch (AMQInvalidArgumentException ise) {
                this._connection.sendConnectionClose(AMQConstant.ARGUMENT_INVALID, ise.getMessage(), this._channelId);
            }
            catch (MessageSource.ExistingExclusiveConsumer e) {
                this._connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue '" + queue1.getName() + "' as it already has an existing exclusive consumer", this._channelId);
            }
            catch (MessageSource.ExistingConsumerPreventsExclusive e) {
                this._connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue '" + queue1.getName() + "' exclusively as it already has a consumer", this._channelId);
            }
            catch (AccessControlException e) {
                this._connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue '" + queue1.getName() + "' permission denied", this._channelId);
            }
            catch (MessageSource.ConsumerAccessRefused consumerAccessRefused) {
                this._connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue '" + queue1.getName() + "' as it already has an incompatible exclusivity policy", this._channelId);
            }
        }
    }

    public void receiveBasicGet(AMQShortString queueName, boolean noAck) {
        AMQQueue queue;
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV[" + this._channelId + "] BasicGet[" + " queue: " + queueName + " noAck: " + noAck + " ]");
        }
        VirtualHostImpl vHost = this.getVirtualHost();
        this.sync();
        Object object = queue = queueName == null ? this.getDefaultQueue() : vHost.getAttainedMessageSource(queueName.toString());
        if (queue == null) {
            if (_logger.isDebugEnabled()) {
                _logger.debug("No queue for '" + queueName + "'");
            }
            if (queueName != null) {
                this._connection.sendConnectionClose(AMQConstant.NOT_FOUND, "No such queue, '" + queueName + "'", this._channelId);
            } else {
                this._connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "No queue name provided, no default queue defined.", this._channelId);
            }
        } else {
            try {
                if (!this.performGet((MessageSource)queue, !noAck)) {
                    MethodRegistry methodRegistry = this._connection.getMethodRegistry();
                    BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null);
                    this._connection.writeFrame((AMQDataBlock)responseBody.generateFrame(this._channelId));
                }
            }
            catch (AccessControlException e) {
                this._connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(), this._channelId);
            }
            catch (MessageSource.ExistingExclusiveConsumer e) {
                this._connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "Queue has an exclusive consumer", this._channelId);
            }
            catch (MessageSource.ExistingConsumerPreventsExclusive e) {
                this._connection.sendConnectionClose(AMQConstant.INTERNAL_ERROR, "The GET request has been evaluated as an exclusive consumer, this is likely due to a programming error in the Qpid broker", this._channelId);
            }
            catch (MessageSource.ConsumerAccessRefused consumerAccessRefused) {
                this._connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "Queue has an incompatible exclusivity policy", this._channelId);
            }
        }
    }

    public void receiveBasicPublish(AMQShortString exchangeName, AMQShortString routingKey, boolean mandatory, boolean immediate) {
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV[" + this._channelId + "] BasicPublish[" + " exchange: " + exchangeName + " routingKey: " + routingKey + " mandatory: " + mandatory + " immediate: " + immediate + " ]");
        }
        VirtualHostImpl vHost = this.getVirtualHost();
        if (this.blockingTimeoutExceeded()) {
            this.getVirtualHost().getEventLogger().message(ChannelMessages.FLOW_CONTROL_IGNORED());
            this.closeChannel(AMQConstant.MESSAGE_TOO_LARGE, "Channel flow control was requested, but not enforced by sender");
        } else {
            MessageDestination destination = this.isDefaultExchange(exchangeName) ? vHost.getDefaultDestination() : vHost.getAttainedMessageDestination(exchangeName.toString());
            if (destination == null) {
                this.closeChannel(AMQConstant.NOT_FOUND, "Unknown exchange name: '" + exchangeName + "'");
            } else {
                MessagePublishInfo info = new MessagePublishInfo(exchangeName, immediate, mandatory, routingKey);
                try {
                    this.setPublishFrame(info, destination);
                }
                catch (AccessControlException e) {
                    this._connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(), this.getChannelId());
                }
            }
        }
    }

    private boolean blockingTimeoutExceeded() {
        return this._wireBlockingState && System.currentTimeMillis() - this._blockTime > this._blockingTimeout;
    }

    public void receiveBasicQos(long prefetchSize, int prefetchCount, boolean global) {
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV[" + this._channelId + "] BasicQos[" + " prefetchSize: " + prefetchSize + " prefetchCount: " + prefetchCount + " global: " + global + " ]");
        }
        this.sync();
        this.setCredit(prefetchSize, prefetchCount);
        MethodRegistry methodRegistry = this._connection.getMethodRegistry();
        BasicQosOkBody responseBody = methodRegistry.createBasicQosOkBody();
        this._connection.writeFrame((AMQDataBlock)responseBody.generateFrame(this.getChannelId()));
    }

    public void receiveBasicRecover(boolean requeue, boolean sync) {
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV[" + this._channelId + "] BasicRecover[" + " requeue: " + requeue + " sync: " + sync + " ]");
        }
        this.resend();
        if (sync) {
            MethodRegistry methodRegistry = this._connection.getMethodRegistry();
            BasicRecoverSyncOkBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody();
            this.sync();
            this._connection.writeFrame((AMQDataBlock)recoverOk.generateFrame(this.getChannelId()));
        }
    }

    public void receiveBasicReject(long deliveryTag, boolean requeue) {
        MessageInstance message;
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV[" + this._channelId + "] BasicReject[" + " deliveryTag: " + deliveryTag + " requeue: " + requeue + " ]");
        }
        if ((message = this.getUnacknowledgedMessageMap().get(deliveryTag)) == null) {
            _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag);
        } else if (message.getMessage() == null) {
            _logger.warn("Message has already been purged, unable to Reject.");
        } else {
            if (_logger.isDebugEnabled()) {
                _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage() + ": Requeue:" + requeue + " on channel:" + this.debugIdentity());
            }
            if (requeue) {
                message.decrementDeliveryCount();
                this.requeue(deliveryTag);
            } else {
                boolean maxDeliveryCountEnabled = this.isMaxDeliveryCountEnabled(deliveryTag);
                if (_logger.isDebugEnabled()) {
                    _logger.debug("maxDeliveryCountEnabled: " + maxDeliveryCountEnabled + " deliveryTag " + deliveryTag);
                }
                if (maxDeliveryCountEnabled) {
                    boolean deliveredTooManyTimes = this.isDeliveredTooManyTimes(deliveryTag);
                    if (_logger.isDebugEnabled()) {
                        _logger.debug("deliveredTooManyTimes: " + deliveredTooManyTimes + " deliveryTag " + deliveryTag);
                    }
                    if (deliveredTooManyTimes) {
                        this.deadLetter(deliveryTag);
                    } else {
                        message.incrementDeliveryCount();
                    }
                } else {
                    this.requeue(deliveryTag);
                }
            }
        }
    }

    public void receiveChannelClose(int replyCode, AMQShortString replyText, int classId, int methodId) {
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV[" + this._channelId + "] ChannelClose[" + " replyCode: " + replyCode + " replyText: " + replyText + " classId: " + classId + " methodId: " + methodId + " ]");
        }
        this.sync();
        this._connection.closeChannel(this);
        this._connection.writeFrame((AMQDataBlock)new AMQFrame(this.getChannelId(), (AMQBody)this._connection.getMethodRegistry().createChannelCloseOkBody()));
    }

    public void receiveChannelCloseOk() {
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV[" + this._channelId + "] ChannelCloseOk");
        }
        this._connection.closeChannelOk(this.getChannelId());
    }

    public void receiveMessageContent(QpidByteBuffer data) {
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV[" + this._channelId + "] MessageContent[" + " data: " + Functions.hex((QpidByteBuffer)data, (int)this._connection.getBinaryDataLimit()) + " ] ");
        }
        if (this.hasCurrentMessage()) {
            this.publishContentBody(new ContentBody(data));
        } else {
            this._connection.sendConnectionClose(AMQConstant.COMMAND_INVALID, "Attempt to send a content header without first sending a publish frame", this._channelId);
        }
    }

    public void receiveMessageHeader(BasicContentHeaderProperties properties, long bodySize) {
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV[" + this._channelId + "] MessageHeader[ properties: {" + properties + "} bodySize: " + bodySize + " ]");
        }
        if (this.hasCurrentMessage()) {
            if (bodySize > this._connection.getMaxMessageSize()) {
                this.closeChannel(AMQConstant.MESSAGE_TOO_LARGE, "Message size of " + bodySize + " greater than allowed maximum of " + this._connection.getMaxMessageSize());
            }
            this.publishContentHeader(new ContentHeaderBody(properties, bodySize));
        } else {
            this._connection.sendConnectionClose(AMQConstant.COMMAND_INVALID, "Attempt to send a content header without first sending a publish frame", this._channelId);
        }
    }

    public boolean ignoreAllButCloseOk() {
        return this._connection.ignoreAllButCloseOk() || this._connection.channelAwaitingClosure(this._channelId);
    }

    public void receiveBasicNack(long deliveryTag, boolean multiple, boolean requeue) {
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV[" + this._channelId + "] BasicNack[" + " deliveryTag: " + deliveryTag + " multiple: " + multiple + " requeue: " + requeue + " ]");
        }
        LinkedHashMap<Long, MessageInstance> nackedMessageMap = new LinkedHashMap<Long, MessageInstance>();
        this._unacknowledgedMessageMap.collect(deliveryTag, multiple, nackedMessageMap);
        for (MessageInstance message : nackedMessageMap.values()) {
            if (message == null) {
                _logger.warn("Ignoring nack request as message is null for tag:" + deliveryTag);
                continue;
            }
            if (message.getMessage() == null) {
                _logger.warn("Message has already been purged, unable to nack.");
                continue;
            }
            if (_logger.isDebugEnabled()) {
                _logger.debug("Nack-ing: DT:" + deliveryTag + "-" + message.getMessage() + ": Requeue:" + requeue + " on channel:" + this.debugIdentity());
            }
            if (requeue) {
                message.decrementDeliveryCount();
                this.requeue(deliveryTag);
                continue;
            }
            message.reject();
            boolean maxDeliveryCountEnabled = this.isMaxDeliveryCountEnabled(deliveryTag);
            if (_logger.isDebugEnabled()) {
                _logger.debug("maxDeliveryCountEnabled: " + maxDeliveryCountEnabled + " deliveryTag " + deliveryTag);
            }
            if (maxDeliveryCountEnabled) {
                boolean deliveredTooManyTimes = this.isDeliveredTooManyTimes(deliveryTag);
                if (_logger.isDebugEnabled()) {
                    _logger.debug("deliveredTooManyTimes: " + deliveredTooManyTimes + " deliveryTag " + deliveryTag);
                }
                if (deliveredTooManyTimes) {
                    this.deadLetter(deliveryTag);
                    continue;
                }
                message.incrementDeliveryCount();
                continue;
            }
            this.requeue(deliveryTag);
        }
    }

    public void receiveChannelFlow(boolean active) {
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV[" + this._channelId + "] ChannelFlow[" + " active: " + active + " ]");
        }
        this.sync();
        this.setSuspended(!active);
        MethodRegistry methodRegistry = this._connection.getMethodRegistry();
        ChannelFlowOkBody responseBody = methodRegistry.createChannelFlowOkBody(active);
        this._connection.writeFrame((AMQDataBlock)responseBody.generateFrame(this.getChannelId()));
    }

    public void receiveChannelFlowOk(boolean active) {
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV[" + this._channelId + "] ChannelFlowOk[" + " active: " + active + " ]");
        }
    }

    public void receiveExchangeBound(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName) {
        String replyText;
        int replyCode;
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV[" + this._channelId + "] ExchangeBound[" + " exchange: " + exchangeName + " routingKey: " + routingKey + " queue: " + queueName + " ]");
        }
        VirtualHostImpl virtualHost = this.getVirtualHost();
        MethodRegistry methodRegistry = this._connection.getMethodRegistry();
        this.sync();
        if (this.isDefaultExchange(exchangeName)) {
            MessageSource queue;
            if (routingKey == null) {
                if (queueName == null) {
                    replyCode = virtualHost.getQueues().isEmpty() ? 3 : 0;
                    replyText = null;
                } else {
                    queue = virtualHost.getAttainedMessageSource(queueName.toString());
                    if (queue == null) {
                        replyCode = 2;
                        replyText = "Queue '" + queueName + "' not found";
                    } else {
                        replyCode = 0;
                        replyText = null;
                    }
                }
            } else if (queueName == null) {
                replyCode = virtualHost.getAttainedQueue(routingKey.toString()) == null ? 5 : 0;
                replyText = null;
            } else {
                queue = virtualHost.getAttainedQueue(queueName.toString());
                if (queue == null) {
                    replyCode = 2;
                    replyText = "Queue '" + queueName + "' not found";
                } else {
                    replyCode = queueName.equals(routingKey) ? 0 : 6;
                    replyText = null;
                }
            }
        } else {
            ExchangeImpl exchange = virtualHost.getAttainedExchange(exchangeName.toString());
            if (exchange == null) {
                replyCode = 1;
                replyText = "Exchange '" + exchangeName + "' not found";
            } else if (routingKey == null) {
                if (queueName == null) {
                    if (exchange.hasBindings()) {
                        replyCode = 0;
                        replyText = null;
                    } else {
                        replyCode = 3;
                        replyText = null;
                    }
                } else {
                    AMQQueue queue = virtualHost.getAttainedQueue(queueName.toString());
                    if (queue == null) {
                        replyCode = 2;
                        replyText = "Queue '" + queueName + "' not found";
                    } else if (exchange.isBound(queue)) {
                        replyCode = 0;
                        replyText = null;
                    } else {
                        replyCode = 4;
                        replyText = "Queue '" + queueName + "' not bound to exchange '" + exchangeName + "'";
                    }
                }
            } else if (queueName != null) {
                AMQQueue queue = virtualHost.getAttainedQueue(queueName.toString());
                if (queue == null) {
                    replyCode = 2;
                    replyText = "Queue '" + queueName + "' not found";
                } else {
                    String bindingKey;
                    String string = bindingKey = routingKey == null ? null : routingKey.toString();
                    if (exchange.isBound(bindingKey, queue)) {
                        replyCode = 0;
                        replyText = null;
                    } else {
                        replyCode = 6;
                        replyText = "Queue '" + queueName + "' not bound with routing key '" + routingKey + "' to exchange '" + exchangeName + "'";
                    }
                }
            } else if (exchange.isBound(routingKey == null ? "" : routingKey.toString())) {
                replyCode = 0;
                replyText = null;
            } else {
                replyCode = 5;
                replyText = "No queue bound with routing key '" + routingKey + "' to exchange '" + exchangeName + "'";
            }
        }
        ExchangeBoundOkBody exchangeBoundOkBody = methodRegistry.createExchangeBoundOkBody(replyCode, AMQShortString.validValueOf(replyText));
        this._connection.writeFrame((AMQDataBlock)exchangeBoundOkBody.generateFrame(this.getChannelId()));
    }

    public void receiveExchangeDeclare(AMQShortString exchangeName, AMQShortString type, boolean passive, boolean durable, boolean autoDelete, boolean internal, boolean nowait, FieldTable arguments) {
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV[" + this._channelId + "] ExchangeDeclare[" + " exchange: " + exchangeName + " type: " + type + " passive: " + passive + " durable: " + durable + " autoDelete: " + autoDelete + " internal: " + internal + " nowait: " + nowait + " arguments: " + arguments + " ]");
        }
        MethodRegistry methodRegistry = this._connection.getMethodRegistry();
        ExchangeDeclareOkBody declareOkBody = methodRegistry.createExchangeDeclareOkBody();
        VirtualHostImpl virtualHost = this.getVirtualHost();
        if (this.isDefaultExchange(exchangeName)) {
            if (!new AMQShortString("direct").equals(type)) {
                this._connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "Attempt to redeclare default exchange:  of type direct to " + type + ".", this.getChannelId());
            } else if (!nowait) {
                this.sync();
                this._connection.writeFrame((AMQDataBlock)declareOkBody.generateFrame(this.getChannelId()));
            }
        } else if (passive) {
            ExchangeImpl exchange = virtualHost.getAttainedExchange(exchangeName.toString());
            if (exchange == null) {
                this.closeChannel(AMQConstant.NOT_FOUND, "Unknown exchange: '" + exchangeName + "'");
            } else if (type != null && type.length() != 0 && !exchange.getType().equals(type.toString())) {
                this._connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: '" + exchangeName + "' of type " + exchange.getType() + " to " + type + ".", this.getChannelId());
            } else if (!nowait) {
                this.sync();
                this._connection.writeFrame((AMQDataBlock)declareOkBody.generateFrame(this.getChannelId()));
            }
        } else {
            String name = exchangeName.toString();
            String typeString = type == null ? null : type.toString();
            try {
                HashMap<String, Object> attributes = new HashMap<String, Object>();
                if (arguments != null) {
                    attributes.putAll(FieldTable.convertToMap((FieldTable)arguments));
                }
                attributes.put("name", name);
                attributes.put("type", typeString);
                attributes.put("durable", durable);
                attributes.put("lifetimePolicy", autoDelete ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
                if (!attributes.containsKey("alternateExchange")) {
                    attributes.put("alternateExchange", null);
                }
                ExchangeImpl exchange = virtualHost.createExchange(attributes);
                if (!nowait) {
                    this.sync();
                    this._connection.writeFrame((AMQDataBlock)declareOkBody.generateFrame(this.getChannelId()));
                }
            }
            catch (ReservedExchangeNameException e) {
                ExchangeImpl existing = virtualHost.getAttainedExchange(exchangeName.toString());
                if (existing == null || !existing.getType().equals(typeString)) {
                    this._connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "Attempt to declare exchange: '" + exchangeName + "' which begins with reserved prefix.", this.getChannelId());
                } else if (!nowait) {
                    this.sync();
                    this._connection.writeFrame((AMQDataBlock)declareOkBody.generateFrame(this.getChannelId()));
                }
            }
            catch (ExchangeExistsException e) {
                ExchangeImpl exchange = e.getExistingExchange();
                if (!exchange.getType().equals(typeString)) {
                    this._connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: '" + exchangeName + "' of type " + exchange.getType() + " to " + type + ".", this.getChannelId());
                } else if (!nowait) {
                    this.sync();
                    this._connection.writeFrame((AMQDataBlock)declareOkBody.generateFrame(this.getChannelId()));
                }
            }
            catch (NoFactoryForTypeException e) {
                this._connection.sendConnectionClose(AMQConstant.COMMAND_INVALID, "Unknown exchange type '" + e.getType() + "' for exchange '" + exchangeName + "'", this.getChannelId());
            }
            catch (AccessControlException e) {
                this._connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(), this.getChannelId());
            }
            catch (UnknownConfiguredObjectException e) {
                String message = "Unknown alternate exchange " + (e.getName() != null ? "name: '" + e.getName() + "'" : "id: " + e.getId());
                this._connection.sendConnectionClose(AMQConstant.NOT_FOUND, message, this.getChannelId());
            }
            catch (IllegalArgumentException e) {
                this._connection.sendConnectionClose(AMQConstant.COMMAND_INVALID, "Error creating exchange '" + exchangeName + "': " + e.getMessage(), this.getChannelId());
            }
        }
    }

    public void receiveExchangeDelete(AMQShortString exchangeStr, boolean ifUnused, boolean nowait) {
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV[" + this._channelId + "] ExchangeDelete[" + " exchange: " + exchangeStr + " ifUnused: " + ifUnused + " nowait: " + nowait + " ]");
        }
        VirtualHostImpl virtualHost = this.getVirtualHost();
        this.sync();
        if (this.isDefaultExchange(exchangeStr)) {
            this._connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "Default Exchange cannot be deleted", this.getChannelId());
        } else {
            String exchangeName = exchangeStr.toString();
            ExchangeImpl exchange = virtualHost.getAttainedExchange(exchangeName);
            if (exchange == null) {
                this.closeChannel(AMQConstant.NOT_FOUND, "No such exchange: '" + exchangeStr + "'");
            } else if (ifUnused && exchange.hasBindings()) {
                this.closeChannel(AMQConstant.IN_USE, "Exchange has bindings");
            } else {
                try {
                    exchange.delete();
                    if (!nowait) {
                        ExchangeDeleteOkBody responseBody = this._connection.getMethodRegistry().createExchangeDeleteOkBody();
                        this._connection.writeFrame((AMQDataBlock)responseBody.generateFrame(this.getChannelId()));
                    }
                }
                catch (ExchangeIsAlternateException e) {
                    this.closeChannel(AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange");
                }
                catch (RequiredExchangeException e) {
                    this.closeChannel(AMQConstant.NOT_ALLOWED, "Exchange '" + exchangeStr + "' cannot be deleted");
                }
                catch (AccessControlException e) {
                    this._connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(), this.getChannelId());
                }
            }
        }
    }

    public void receiveQueueBind(AMQShortString queueName, AMQShortString exchange, AMQShortString routingKey, boolean nowait, FieldTable argumentsTable) {
        AMQQueue queue;
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV[" + this._channelId + "] QueueBind[" + " queue: " + queueName + " exchange: " + exchange + " bindingKey: " + routingKey + " nowait: " + nowait + " arguments: " + argumentsTable + " ]");
        }
        VirtualHostImpl virtualHost = this.getVirtualHost();
        if (queueName == null) {
            queue = this.getDefaultQueue();
            if (queue != null && routingKey == null) {
                routingKey = AMQShortString.valueOf((String)queue.getName());
            }
        } else {
            queue = virtualHost.getAttainedQueue(queueName.toString());
            AMQShortString aMQShortString = routingKey = routingKey == null ? AMQShortString.EMPTY_STRING : routingKey;
        }
        if (queue == null) {
            String message = queueName == null ? "No default queue defined on channel and queue was null" : "Queue " + queueName + " does not exist.";
            this.closeChannel(AMQConstant.NOT_FOUND, message);
        } else if (this.isDefaultExchange(exchange)) {
            this._connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "Cannot bind the queue '" + queueName + "' to the default exchange", this.getChannelId());
        } else {
            String exchangeName = exchange.toString();
            ExchangeImpl exch = virtualHost.getAttainedExchange(exchangeName);
            if (exch == null) {
                this.closeChannel(AMQConstant.NOT_FOUND, "Exchange '" + exchangeName + "' does not exist.");
            } else {
                try {
                    Map arguments = FieldTable.convertToMap((FieldTable)argumentsTable);
                    String bindingKey = String.valueOf(routingKey);
                    if (!exch.isBound(bindingKey, arguments, queue) && !exch.addBinding(bindingKey, queue, arguments) && "topic".equals(exch.getType())) {
                        exch.replaceBinding(bindingKey, queue, arguments);
                    }
                    if (_logger.isDebugEnabled()) {
                        _logger.debug("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
                    }
                    if (!nowait) {
                        this.sync();
                        MethodRegistry methodRegistry = this._connection.getMethodRegistry();
                        QueueBindOkBody responseBody = methodRegistry.createQueueBindOkBody();
                        this._connection.writeFrame((AMQDataBlock)responseBody.generateFrame(this.getChannelId()));
                    }
                }
                catch (AccessControlException e) {
                    this._connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(), this.getChannelId());
                }
            }
        }
    }

    public void receiveQueueDeclare(AMQShortString queueStr, boolean passive, boolean durable, boolean exclusive, boolean autoDelete, boolean nowait, FieldTable arguments) {
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV[" + this._channelId + "] QueueDeclare[" + " queue: " + queueStr + " passive: " + passive + " durable: " + durable + " exclusive: " + exclusive + " autoDelete: " + autoDelete + " nowait: " + nowait + " arguments: " + arguments + " ]");
        }
        VirtualHostImpl virtualHost = this.getVirtualHost();
        AMQShortString queueName = queueStr == null || queueStr.length() == 0 ? new AMQShortString("tmp_" + UUID.randomUUID()) : queueStr;
        if (passive) {
            AMQQueue queue = virtualHost.getAttainedQueue(queueName.toString());
            if (queue == null) {
                this.closeChannel(AMQConstant.NOT_FOUND, "Queue: '" + queueName + "' not found on VirtualHost '" + virtualHost.getName() + "'.");
            } else if (!queue.verifySessionAccess((AMQSessionModel)this)) {
                this._connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "Queue '" + queue.getName() + "' is exclusive, but not created on this Connection.", this.getChannelId());
            } else {
                this.setDefaultQueue(queue);
                if (!nowait) {
                    this.sync();
                    MethodRegistry methodRegistry = this._connection.getMethodRegistry();
                    QueueDeclareOkBody responseBody = methodRegistry.createQueueDeclareOkBody(queueName, (long)queue.getQueueDepthMessages(), (long)queue.getConsumerCount());
                    this._connection.writeFrame((AMQDataBlock)responseBody.generateFrame(this.getChannelId()));
                    if (_logger.isDebugEnabled()) {
                        _logger.debug("Queue " + queueName + " declared successfully");
                    }
                }
            }
        } else {
            try {
                ExclusivityPolicy exclusivityPolicy;
                LifetimePolicy lifetimePolicy;
                Map attributes = QueueArgumentsConverter.convertWireArgsToModel((Map)FieldTable.convertToMap((FieldTable)arguments));
                String queueNameString = AMQShortString.toString((AMQShortString)queueName);
                attributes.put("name", queueNameString);
                attributes.put("durable", durable);
                if (exclusive) {
                    lifetimePolicy = autoDelete ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : (durable ? LifetimePolicy.PERMANENT : LifetimePolicy.DELETE_ON_CONNECTION_CLOSE);
                    exclusivityPolicy = durable ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.CONNECTION;
                } else {
                    lifetimePolicy = autoDelete ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT;
                    exclusivityPolicy = ExclusivityPolicy.NONE;
                }
                attributes.put("exclusive", exclusivityPolicy);
                attributes.put("lifetimePolicy", lifetimePolicy);
                AMQQueue queue = virtualHost.createQueue(attributes);
                this.setDefaultQueue(queue);
                if (!nowait) {
                    this.sync();
                    MethodRegistry methodRegistry = this._connection.getMethodRegistry();
                    QueueDeclareOkBody responseBody = methodRegistry.createQueueDeclareOkBody(queueName, (long)queue.getQueueDepthMessages(), (long)queue.getConsumerCount());
                    this._connection.writeFrame((AMQDataBlock)responseBody.generateFrame(this.getChannelId()));
                    if (_logger.isDebugEnabled()) {
                        _logger.debug("Queue " + queueName + " declared successfully");
                    }
                }
            }
            catch (QueueExistsException qe) {
                AMQQueue queue = qe.getExistingQueue();
                if (!queue.verifySessionAccess((AMQSessionModel)this)) {
                    this._connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "Queue '" + queue.getName() + "' is exclusive, but not created on this Connection.", this.getChannelId());
                } else if (queue.isExclusive() != exclusive) {
                    this.closeChannel(AMQConstant.ALREADY_EXISTS, "Cannot re-declare queue '" + queue.getName() + "' with different exclusivity (was: " + queue.isExclusive() + " requested " + exclusive + ")");
                } else if (autoDelete && queue.getLifetimePolicy() != LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS || !autoDelete && queue.getLifetimePolicy() != (exclusive && !durable ? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE : LifetimePolicy.PERMANENT)) {
                    this.closeChannel(AMQConstant.ALREADY_EXISTS, "Cannot re-declare queue '" + queue.getName() + "' with different lifetime policy (was: " + queue.getLifetimePolicy() + " requested autodelete: " + autoDelete + ")");
                } else {
                    this.setDefaultQueue(queue);
                    if (!nowait) {
                        this.sync();
                        MethodRegistry methodRegistry = this._connection.getMethodRegistry();
                        QueueDeclareOkBody responseBody = methodRegistry.createQueueDeclareOkBody(queueName, (long)queue.getQueueDepthMessages(), (long)queue.getConsumerCount());
                        this._connection.writeFrame((AMQDataBlock)responseBody.generateFrame(this.getChannelId()));
                        if (_logger.isDebugEnabled()) {
                            _logger.debug("Queue " + queueName + " declared successfully");
                        }
                    }
                }
            }
            catch (AccessControlException e) {
                this._connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(), this.getChannelId());
            }
        }
    }

    public void receiveQueueDelete(AMQShortString queueName, boolean ifUnused, boolean ifEmpty, boolean nowait) {
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV[" + this._channelId + "] QueueDelete[" + " queue: " + queueName + " ifUnused: " + ifUnused + " ifEmpty: " + ifEmpty + " nowait: " + nowait + " ]");
        }
        VirtualHostImpl virtualHost = this.getVirtualHost();
        this.sync();
        AMQQueue queue = queueName == null ? this.getDefaultQueue() : virtualHost.getAttainedQueue(queueName.toString());
        if (queue == null) {
            this.closeChannel(AMQConstant.NOT_FOUND, "Queue '" + queueName + "' does not exist.");
        } else if (ifEmpty && !queue.isEmpty()) {
            this.closeChannel(AMQConstant.IN_USE, "Queue: '" + queueName + "' is not empty.");
        } else if (ifUnused && !queue.isUnused()) {
            this.closeChannel(AMQConstant.IN_USE, "Queue: '" + queueName + "' is still used.");
        } else if (!queue.verifySessionAccess((AMQSessionModel)this)) {
            this._connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "Queue '" + queue.getName() + "' is exclusive, but not created on this Connection.", this.getChannelId());
        } else {
            try {
                int purged = virtualHost.removeQueue(queue);
                if (!nowait || this._connection.isSendQueueDeleteOkRegardless()) {
                    MethodRegistry methodRegistry = this._connection.getMethodRegistry();
                    QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody((long)purged);
                    this._connection.writeFrame((AMQDataBlock)responseBody.generateFrame(this.getChannelId()));
                }
            }
            catch (AccessControlException e) {
                this._connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(), this.getChannelId());
            }
        }
    }

    public void receiveQueuePurge(AMQShortString queueName, boolean nowait) {
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV[" + this._channelId + "] QueuePurge[" + " queue: " + queueName + " nowait: " + nowait + " ]");
        }
        VirtualHostImpl virtualHost = this.getVirtualHost();
        AMQQueue queue = null;
        if (queueName == null && (queue = this.getDefaultQueue()) == null) {
            this._connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "No queue specified.", this.getChannelId());
        } else if (queueName != null && (queue = virtualHost.getAttainedQueue(queueName.toString())) == null) {
            this.closeChannel(AMQConstant.NOT_FOUND, "Queue '" + queueName + "' does not exist.");
        } else if (!queue.verifySessionAccess((AMQSessionModel)this)) {
            this._connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "Queue is exclusive, but not created on this Connection.", this.getChannelId());
        } else {
            try {
                long purged = queue.clearQueue();
                if (!nowait) {
                    this.sync();
                    MethodRegistry methodRegistry = this._connection.getMethodRegistry();
                    QueuePurgeOkBody responseBody = methodRegistry.createQueuePurgeOkBody(purged);
                    this._connection.writeFrame((AMQDataBlock)responseBody.generateFrame(this.getChannelId()));
                }
            }
            catch (AccessControlException e) {
                this._connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(), this.getChannelId());
            }
        }
    }

    public void receiveQueueUnbind(AMQShortString queueName, AMQShortString exchange, AMQShortString bindingKey, FieldTable arguments) {
        AMQQueue queue;
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV[" + this._channelId + "] QueueUnbind[" + " queue: " + queueName + " exchange: " + exchange + " bindingKey: " + bindingKey + " arguments: " + arguments + " ]");
        }
        VirtualHostImpl virtualHost = this.getVirtualHost();
        boolean useDefaultQueue = queueName == null;
        AMQQueue aMQQueue = queue = useDefaultQueue ? this.getDefaultQueue() : virtualHost.getAttainedQueue(queueName.toString());
        if (queue == null) {
            String message = useDefaultQueue ? "No default queue defined on channel and queue was null" : "Queue '" + queueName + "' does not exist.";
            this.closeChannel(AMQConstant.NOT_FOUND, message);
        } else if (this.isDefaultExchange(exchange)) {
            this._connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "Cannot unbind the queue '" + queue.getName() + "' from the default exchange", this.getChannelId());
        } else {
            ExchangeImpl exch = virtualHost.getAttainedExchange(exchange.toString());
            if (exch == null) {
                this.closeChannel(AMQConstant.NOT_FOUND, "Exchange '" + exchange + "' does not exist.");
            } else if (!exch.hasBinding(String.valueOf(bindingKey), queue)) {
                this.closeChannel(AMQConstant.NOT_FOUND, "No such binding");
            } else {
                try {
                    exch.deleteBinding(String.valueOf(bindingKey), queue);
                    QueueUnbindOkBody responseBody = this._connection.getMethodRegistry().createQueueUnbindOkBody();
                    this.sync();
                    this._connection.writeFrame((AMQDataBlock)responseBody.generateFrame(this.getChannelId()));
                }
                catch (AccessControlException e) {
                    this._connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(), this.getChannelId());
                }
            }
        }
    }

    public void receiveTxSelect() {
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV[" + this._channelId + "] TxSelect");
        }
        this.setLocalTransactional();
        MethodRegistry methodRegistry = this._connection.getMethodRegistry();
        TxSelectOkBody responseBody = methodRegistry.createTxSelectOkBody();
        this._connection.writeFrame((AMQDataBlock)responseBody.generateFrame(this._channelId));
    }

    public void receiveTxCommit() {
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV[" + this._channelId + "] TxCommit");
        }
        if (!this.isTransactional()) {
            this.closeChannel(AMQConstant.COMMAND_INVALID, "Fatal error: commit called on non-transactional channel");
        }
        this.commit(new Runnable(){

            @Override
            public void run() {
                MethodRegistry methodRegistry = AMQChannel.this._connection.getMethodRegistry();
                TxCommitOkBody responseBody = methodRegistry.createTxCommitOkBody();
                AMQChannel.this._connection.writeFrame((AMQDataBlock)responseBody.generateFrame(AMQChannel.this._channelId));
            }
        }, true);
    }

    public void receiveTxRollback() {
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV[" + this._channelId + "] TxRollback");
        }
        if (!this.isTransactional()) {
            this.closeChannel(AMQConstant.COMMAND_INVALID, "Fatal error: rollback called on non-transactional channel");
        }
        MethodRegistry methodRegistry = this._connection.getMethodRegistry();
        TxRollbackOkBody responseBody = methodRegistry.createTxRollbackOkBody();
        Runnable task = new Runnable((AMQMethodBody)responseBody){
            final /* synthetic */ AMQMethodBody val$responseBody;
            {
                this.val$responseBody = aMQMethodBody;
            }

            @Override
            public void run() {
                AMQChannel.this._connection.writeFrame((AMQDataBlock)this.val$responseBody.generateFrame(AMQChannel.this._channelId));
            }
        };
        this.rollback(task);
        this.resend();
    }

    public void receiveConfirmSelect(boolean nowait) {
        if (_logger.isDebugEnabled()) {
            _logger.debug("RECV[" + this._channelId + "] ConfirmSelect [ nowait: " + nowait + " ]");
        }
        this._confirmOnPublish = true;
        if (!nowait) {
            this._connection.writeFrame((AMQDataBlock)new AMQFrame(this._channelId, (AMQBody)ConfirmSelectOkBody.INSTANCE));
        }
    }

    private void closeChannel(AMQConstant cause, String message) {
        this._connection.closeChannelAndWriteFrame(this, cause, message);
    }

    private boolean isDefaultExchange(AMQShortString exchangeName) {
        return exchangeName == null || AMQShortString.EMPTY_STRING.equals(exchangeName);
    }

    private void setDefaultQueue(AMQQueue<?> queue) {
        AMQQueue<?> currentDefaultQueue = this._defaultQueue;
        if (queue != currentDefaultQueue) {
            if (currentDefaultQueue != null) {
                currentDefaultQueue.removeDeleteTask((Action)this._defaultQueueAssociationClearingTask);
            }
            if (queue != null) {
                queue.addDeleteTask((Action)this._defaultQueueAssociationClearingTask);
            }
        }
        this._defaultQueue = queue;
    }

    private AMQQueue getDefaultQueue() {
        return this._defaultQueue;
    }

    public boolean processPending() {
        boolean consumerListNeedsRefreshing;
        if (!this.getAMQPConnection().isIOThread()) {
            return false;
        }
        boolean desiredBlockingState = this._blocking.get();
        if (desiredBlockingState != this._wireBlockingState) {
            this._wireBlockingState = desiredBlockingState;
            this.flow(!desiredBlockingState);
            long l = this._blockTime = desiredBlockingState ? System.currentTimeMillis() : 0L;
        }
        if (this._consumersWithPendingWork.isEmpty()) {
            this._consumersWithPendingWork.addAll(this.getConsumerTargets());
            consumerListNeedsRefreshing = false;
        } else {
            consumerListNeedsRefreshing = true;
        }
        Iterator<ConsumerTarget_0_8> iter = this._consumersWithPendingWork.iterator();
        boolean consumerHasMoreWork = false;
        while (iter.hasNext()) {
            ConsumerTarget_0_8 target = iter.next();
            iter.remove();
            if (!target.hasPendingWork()) continue;
            consumerHasMoreWork = true;
            target.processPending();
            break;
        }
        return consumerHasMoreWork || consumerListNeedsRefreshing;
    }

    public void addTicker(Ticker ticker) {
        this.getConnection().getAggregateTicker().addTicker(ticker);
        this.getAMQPConnection().notifyWork();
    }

    public void removeTicker(Ticker ticker) {
        this.getConnection().getAggregateTicker().removeTicker(ticker);
    }

    public void notifyConsumerTargetCurrentStates() {
        for (ConsumerTarget_0_8 consumerTarget : this.getConsumerTargets()) {
            consumerTarget.notifyCurrentState();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void ensureConsumersNoticedStateChange() {
        for (ConsumerTarget_0_8 consumerTarget : this.getConsumerTargets()) {
            try {
                consumerTarget.getSendLock();
            }
            finally {
                consumerTarget.releaseSendLock();
            }
        }
    }

    private Collection<ConsumerTarget_0_8> getConsumerTargets() {
        return this._tag2SubscriptionTargetMap.values();
    }

    private class DefaultQueueAssociationClearingTask
    implements Action<AMQQueue> {
        private DefaultQueueAssociationClearingTask() {
        }

        public void performAction(AMQQueue queue) {
            if (queue == AMQChannel.this._defaultQueue) {
                AMQChannel.this._defaultQueue = null;
            }
        }
    }

    private class ConsumerClosedListener
    implements ConfigurationChangeListener {
        private ConsumerClosedListener() {
        }

        public void stateChanged(ConfiguredObject object, State oldState, State newState) {
            if (newState == State.DELETED) {
                AMQChannel.this.consumerRemoved((Consumer)object);
            }
        }

        public void childAdded(ConfiguredObject object, ConfiguredObject child) {
        }

        public void childRemoved(ConfiguredObject object, ConfiguredObject child) {
        }

        public void attributeSet(ConfiguredObject object, String attributeName, Object oldAttributeValue, Object newAttributeValue) {
        }

        public void bulkChangeStart(ConfiguredObject<?> object) {
        }

        public void bulkChangeEnd(ConfiguredObject<?> object) {
        }
    }

    private static class AsyncCommand {
        private final ListenableFuture<Void> _future;
        private ServerTransaction.Action _action;

        public AsyncCommand(ListenableFuture<Void> future, ServerTransaction.Action action) {
            this._future = future;
            this._action = action;
        }

        void complete() {
            boolean interrupted = false;
            try {
                while (true) {
                    try {
                        this._future.get();
                    }
                    catch (InterruptedException e) {
                        interrupted = true;
                        continue;
                    }
                    break;
                }
            }
            catch (ExecutionException e) {
                if (e.getCause() instanceof RuntimeException) {
                    throw (RuntimeException)e.getCause();
                }
                if (e.getCause() instanceof Error) {
                    throw (Error)e.getCause();
                }
                throw new ServerScopedRuntimeException(e.getCause());
            }
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
            this._action.postCommit();
            this._action = null;
        }
    }

    private class WriteReturnAction
    implements ServerTransaction.Action {
        private final AMQConstant _errorCode;
        private final String _description;
        private final MessageReference<AMQMessage> _reference;

        public WriteReturnAction(AMQConstant errorCode, String description, AMQMessage message) {
            this._errorCode = errorCode;
            this._description = description;
            this._reference = message.newReference();
        }

        public void postCommit() {
            AMQMessage message = (AMQMessage)this._reference.getMessage();
            AMQChannel.this._connection.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(), message.getContentHeaderBody(), (MessageContentSource)message, AMQChannel.this._channelId, this._errorCode.getCode(), AMQShortString.validValueOf((Object)this._description));
            this._reference.release();
        }

        public void onRollback() {
            this._reference.release();
        }
    }

    private class MessageAcknowledgeAction
    implements ServerTransaction.Action {
        private Collection<MessageInstance> _ackedMessages;

        public MessageAcknowledgeAction(Collection<MessageInstance> ackedMessages) {
            this._ackedMessages = ackedMessages;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void postCommit() {
            try {
                for (MessageInstance entry : this._ackedMessages) {
                    entry.delete();
                }
            }
            finally {
                this._ackedMessages = Collections.emptySet();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onRollback() {
            if (AMQChannel.this._rollingBack) {
                for (MessageInstance entry : this._ackedMessages) {
                    entry.makeAcquisitionStealable();
                }
                AMQChannel.this._resendList.addAll(this._ackedMessages);
            } else {
                try {
                    for (MessageInstance entry : this._ackedMessages) {
                        entry.release(entry.getAcquiringConsumer());
                    }
                }
                finally {
                    this._ackedMessages = Collections.emptySet();
                }
            }
        }
    }

    private final class CapacityCheckAction
    implements Action<MessageInstance> {
        private CapacityCheckAction() {
        }

        public void performAction(MessageInstance entry) {
            TransactionLogResource queue = entry.getOwningResource();
            if (queue instanceof CapacityChecker) {
                ((CapacityChecker)queue).checkCapacity((AMQSessionModel)AMQChannel.this);
            }
        }
    }

    private class ImmediateAction
    implements Action<MessageInstance> {
        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void performAction(MessageInstance entry) {
            TransactionLogResource queue = entry.getOwningResource();
            if (!entry.getDeliveredToConsumer() && entry.acquire()) {
                LocalTransaction txn = new LocalTransaction(AMQChannel.this._messageStore);
                final AMQMessage message = (AMQMessage)entry.getMessage();
                MessageReference ref = message.newReference();
                try {
                    entry.delete();
                    txn.dequeue(entry.getEnqueueRecord(), new ServerTransaction.Action(){

                        public void postCommit() {
                            ProtocolOutputConverter outputConverter = AMQChannel.this._connection.getProtocolOutputConverter();
                            outputConverter.writeReturn(message.getMessagePublishInfo(), message.getContentHeaderBody(), (MessageContentSource)message, AMQChannel.this._channelId, AMQConstant.NO_CONSUMERS.getCode(), IMMEDIATE_DELIVERY_REPLY_TEXT);
                        }

                        public void onRollback() {
                        }
                    });
                    txn.commit();
                }
                finally {
                    ref.release();
                }
            } else if (queue instanceof CapacityChecker) {
                ((CapacityChecker)queue).checkCapacity((AMQSessionModel)AMQChannel.this);
            }
        }
    }

    private class GetDeliveryMethod
    implements ClientDeliveryMethod {
        private final FlowCreditManager _singleMessageCredit;
        private final MessageSource _queue;
        private boolean _deliveredMessage;

        public GetDeliveryMethod(FlowCreditManager singleMessageCredit, MessageSource queue) {
            this._singleMessageCredit = singleMessageCredit;
            this._queue = queue;
        }

        @Override
        public long deliverToClient(ConsumerImpl sub, ServerMessage message, InstanceProperties props, long deliveryTag) {
            this._singleMessageCredit.useCreditForMessage(message.getSize());
            int queueSize = this._queue instanceof AMQQueue ? ((AMQQueue)this._queue).getQueueDepthMessages() : 0;
            long size = AMQChannel.this._connection.getProtocolOutputConverter().writeGetOk(message, props, AMQChannel.this.getChannelId(), deliveryTag, queueSize);
            this._deliveredMessage = true;
            return size;
        }

        public boolean hasDeliveredMessage() {
            return this._deliveredMessage;
        }
    }

    private class NoLocalFilter
    implements MessageFilter {
        private final Object _connectionReference;

        public NoLocalFilter() {
            this._connectionReference = AMQChannel.this.getConnectionReference();
        }

        public String getName() {
            return AMQPFilterTypes.NO_LOCAL.toString();
        }

        public boolean matches(Filterable message) {
            return message.getConnectionReference() != this._connectionReference;
        }

        public boolean startAtTail() {
            return false;
        }

        public String toString() {
            return "NoLocalFilter[]";
        }
    }
}

