package org.mule.transport.amqp;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.mule.MessageExchangePattern;
import org.mule.api.MuleException;
import org.mule.api.MuleMessage;
import org.mule.api.construct.FlowConstruct;
import org.mule.api.endpoint.InboundEndpoint;
import org.mule.api.lifecycle.CreateException;
import org.mule.api.lifecycle.StartException;
import org.mule.api.transaction.Transaction;
import org.mule.api.transaction.TransactionException;
import org.mule.api.transport.Connector;
import org.mule.api.transport.MessageReceiver;
import org.mule.config.i18n.MessageFactory;
import org.mule.transport.AbstractMessageReceiver;
import org.mule.transport.AbstractReceiverWorker;
import org.mule.transport.amqp.AmqpConnector;

/* loaded from: input_file:org/mule/transport/amqp/AmqpMessageReceiver.class */
public class AmqpMessageReceiver extends AbstractMessageReceiver {
    protected final AmqpConnector amqpConnector;
    protected volatile AmqpConnector.InboundConnection inboundConnection;
    protected volatile String consumerTag;

    /* loaded from: input_file:org/mule/transport/amqp/AmqpMessageReceiver$AmqpConsumer.class */
    public final class AmqpConsumer extends DefaultConsumer {
        public AmqpConsumer(Channel channel) {
            super(channel);
        }

        public void handleCancel(String str) throws IOException {
            AmqpMessageReceiver.this.logger.warn("Received external cancellation of consumer tag: " + str + ", the message receiver will try to restart.");
            AmqpMessageReceiver.this.restart(false);
        }

        public void handleShutdownSignal(String str, ShutdownSignalException shutdownSignalException) {
            AmqpMessageReceiver.this.logger.warn("Received shutdown signal for consumer tag: " + str + ", the message receiver will try to restart.", shutdownSignalException);
            AmqpMessageReceiver.this.restart(false);
        }

        public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
            AmqpMessage amqpMessage = new AmqpMessage(str, envelope, basicProperties, bArr);
            if (AmqpMessageReceiver.this.logger.isDebugEnabled()) {
                AmqpMessageReceiver.this.logger.debug("Received: " + amqpMessage + " from: " + super.getChannel());
            }
            AmqpMessageReceiver.this.deliverAmqpMessage(amqpMessage);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/mule/transport/amqp/AmqpMessageReceiver$AmqpWorker.class */
    public final class AmqpWorker extends AbstractReceiverWorker {
        private final Log logger;
        private final Channel channel;
        private final AmqpMessage amqpMessage;

        private AmqpWorker(Channel channel, AmqpMessage amqpMessage) {
            super(new ArrayList(1), AmqpMessageReceiver.this);
            this.logger = LogFactory.getLog(AmqpWorker.class);
            this.messages.add(amqpMessage);
            this.channel = channel;
            this.amqpMessage = amqpMessage;
        }

        protected void bindTransaction(Transaction transaction) throws TransactionException {
            if (this.channel == null) {
                throw new TransactionException(MessageFactory.createStaticMessage("Channel is null so can't bind transaction: " + transaction + " to message:" + this.amqpMessage));
            }
            transaction.bindResource(this.channel.getConnection(), this.channel);
        }

        protected void preRouteMuleMessage(MuleMessage muleMessage) throws Exception {
            if (AmqpMessageReceiver.this.getEndpoint().getExchangePattern() == MessageExchangePattern.REQUEST_RESPONSE && muleMessage.getReplyTo() == null) {
                this.logger.warn(String.format("Impossible to honor the request-response exchange pattern of %s for AMQP message without reply to: %s", AmqpMessageReceiver.this.getEndpoint(), muleMessage));
            }
            AmqpMessageReceiver.this.amqpConnector.addInvocationPropertiesIfNecessary(this.channel, this.amqpMessage, muleMessage);
        }

        protected void handleResults(List list) throws Exception {
            AmqpMessageReceiver.this.amqpConnector.ackMessageIfNecessary(this.channel, this.amqpMessage, this.endpoint);
        }
    }

    public AmqpMessageReceiver(Connector connector, FlowConstruct flowConstruct, InboundEndpoint inboundEndpoint) throws CreateException {
        super(connector, flowConstruct, inboundEndpoint);
        this.amqpConnector = (AmqpConnector) connector;
    }

    public void doStart() throws MuleException {
        this.inboundConnection = this.amqpConnector.connect((MessageReceiver) this);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Connected queue: " + getQueueName() + " on channel: " + getChannel());
        }
        try {
            if (this.endpoint.getTransactionConfig().isTransacted()) {
                getChannel().txSelect();
            }
            this.consumerTag = getChannel().basicConsume(getQueueName(), this.amqpConnector.getAckMode().isAutoAck(), getClientConsumerTag(), this.amqpConnector.isNoLocal(), this.amqpConnector.isExclusiveConsumers(), (Map) null, new AmqpConsumer(getChannel()));
            this.logger.info("Started subscription: " + this.consumerTag + " on " + (this.endpoint.getTransactionConfig().isTransacted() ? "transacted " : "") + "channel: " + getChannel());
        } catch (Exception e) {
            throw new StartException(MessageFactory.createStaticMessage("Error when subscribing to queue: " + getQueueName() + " on channel: " + getChannel()), e, this);
        }
    }

    public void doStop() {
        Channel channel = null;
        try {
            try {
                channel = getChannel();
                if (channel == null) {
                    this.inboundConnection = null;
                    return;
                }
                if (this.consumerTag != null) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Cancelling subscription of: " + this.consumerTag + " on channel: " + channel);
                    }
                    channel.basicCancel(this.consumerTag);
                    this.logger.info("Cancelled subscription of: " + this.consumerTag + " on channel: " + channel);
                }
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Disconnecting receiver for queue: " + getQueueName() + " from channel: " + channel);
                }
                this.amqpConnector.closeChannel(channel);
                this.inboundConnection = null;
            } catch (Exception e) {
                this.logger.warn(MessageFactory.createStaticMessage("Failed to cancel subscription: " + this.consumerTag + " on channel: " + channel), e);
                this.inboundConnection = null;
            }
        } catch (Throwable th) {
            this.inboundConnection = null;
            throw th;
        }
    }

    protected void restart(boolean z) {
        if (!z) {
            this.consumerTag = null;
        }
        try {
            doStop();
            doStart();
        } catch (Exception e) {
            this.logger.error("Failed to restart: " + this, e);
        }
    }

    protected Channel getChannel() {
        if (this.inboundConnection == null) {
            return null;
        }
        return this.inboundConnection.getChannel();
    }

    protected String getQueueName() {
        if (this.inboundConnection == null) {
            return null;
        }
        return this.inboundConnection.getQueue();
    }

    protected String getClientConsumerTag() {
        return AmqpEndpointUtil.getConsumerTag(getEndpoint());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void deliverAmqpMessage(AmqpMessage amqpMessage) {
        try {
            new AmqpWorker(getChannel(), amqpMessage).processMessages();
        } catch (Exception e) {
            getConnector().getMuleContext().getExceptionListener().handleException(e);
        }
    }
}
