package com.mule.extensions.amqp.internal.listener;

import com.mule.extensions.amqp.api.config.AckMode;
import com.mule.extensions.amqp.api.config.ListenerQualityOfService;
import com.mule.extensions.amqp.api.exception.AmqpQueueNotFoundException;
import com.mule.extensions.amqp.api.listener.RecoverStrategy;
import com.mule.extensions.amqp.api.message.AmqpAttributes;
import com.mule.extensions.amqp.api.model.QueueDefinition;
import com.mule.extensions.amqp.internal.common.AmqpCommons;
import com.mule.extensions.amqp.internal.config.AmqpConfig;
import com.mule.extensions.amqp.internal.config.InternalAckMode;
import com.mule.extensions.amqp.internal.connection.AmqpTransactionalConnection;
import com.mule.extensions.amqp.internal.connection.channel.AmqpChannelManager;
import com.mule.extensions.amqp.internal.entity.AmqpQueueDeclarer;
import com.mule.extensions.amqp.internal.listener.MultiChannelReceiverManager;
import com.mule.extensions.amqp.internal.metadata.AmqpOutputResolver;
import com.mule.extensions.amqp.internal.publisher.DefaultAmqpMessagePublisher;
import com.mule.extensions.amqp.internal.source.AmqpResponseMessageBuilder;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.TimeoutException;
import javax.inject.Inject;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.api.message.ErrorType;
import org.mule.runtime.api.meta.ExpressionSupport;
import org.mule.runtime.api.transformation.TransformationService;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.exception.Errors;
import org.mule.runtime.extension.api.annotation.Alias;
import org.mule.runtime.extension.api.annotation.Expression;
import org.mule.runtime.extension.api.annotation.execution.OnError;
import org.mule.runtime.extension.api.annotation.execution.OnSuccess;
import org.mule.runtime.extension.api.annotation.execution.OnTerminate;
import org.mule.runtime.extension.api.annotation.metadata.MetadataScope;
import org.mule.runtime.extension.api.annotation.param.Config;
import org.mule.runtime.extension.api.annotation.param.ConfigOverride;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.MediaType;
import org.mule.runtime.extension.api.annotation.param.Optional;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.param.ParameterGroup;
import org.mule.runtime.extension.api.annotation.param.display.Summary;
import org.mule.runtime.extension.api.annotation.source.BackPressure;
import org.mule.runtime.extension.api.annotation.source.EmitsResponse;
import org.mule.runtime.extension.api.annotation.source.OnBackPressure;
import org.mule.runtime.extension.api.runtime.parameter.CorrelationInfo;
import org.mule.runtime.extension.api.runtime.parameter.OutboundCorrelationStrategy;
import org.mule.runtime.extension.api.runtime.source.BackPressureContext;
import org.mule.runtime.extension.api.runtime.source.BackPressureMode;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.mule.runtime.extension.api.runtime.source.SourceCompletionCallback;
import org.mule.runtime.extension.api.runtime.source.SourceResult;
import org.mule.runtime.extension.api.tx.SourceTransactionalAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@BackPressure(defaultMode = BackPressureMode.WAIT, supportedModes = {BackPressureMode.WAIT})
@MetadataScope(outputResolver = AmqpOutputResolver.class)
@MediaType(value = "*/*", strict = false)
@Alias("listener")
@EmitsResponse
/* loaded from: input_file:com/mule/extensions/amqp/internal/listener/AmqpListener.class */
public class AmqpListener extends Source<InputStream, AmqpAttributes> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpListener.class);
    static final String AMQP_CHANNEL = "AMQP_CHANNEL";
    static final String AMQP_DELIVERY_TAG = "AMQP_DELIVERY_TAG";
    static final String REPLY_TO_DESTINATION_VAR = "REPLY_TO_DESTINATION";

    @Inject
    private AmqpChannelManager channelManager;

    @Inject
    private TransformationService transformationService;

    @Connection
    protected ConnectionProvider<AmqpTransactionalConnection> connectionProvider;
    private AmqpTransactionalConnection connection;
    private SourceTransactionalAction transactionalAction;

    @Config
    protected AmqpConfig config;
    private MultiChannelReceiverManager multiChannelReceiverManager = null;

    @Parameter
    private String queueName;

    @Inject
    private MuleContext muleContext;

    @Optional
    @Parameter
    @Summary("Declaration of a queue definition to use in case no queue with the queueName provided exists in the broker.")
    @Expression(ExpressionSupport.NOT_SUPPORTED)
    private QueueDefinition fallbackQueueDefinition;

    @Optional
    @Parameter
    @Summary("The acknowledgment mode to use when consuming from the AMQP broker.")
    @ConfigOverride
    private AckMode ackMode;

    @ConfigOverride
    @Parameter
    @Summary("The number of channels that are spawned per inbound endpoint to receive AMQP messages.")
    private int numberOfConsumers;

    @Optional
    @Parameter
    @Summary("A client-generated consumer tag to establish context.")
    private String consumerTag;

    @Optional(defaultValue = "REQUEUE")
    @Parameter
    @Summary("Recovery strategy in case of error")
    private RecoverStrategy recoverStrategy;

    @Optional
    @Parameter
    @Summary("The default encoding of the message body to be used if the message doesn't communicate it")
    @Expression(ExpressionSupport.NOT_SUPPORTED)
    private String inboundEncoding;

    @Optional
    @Parameter
    @Summary("The content type encoding of the message body to be used if the message doesn't communicate it")
    @Expression(ExpressionSupport.NOT_SUPPORTED)
    private String inboundContentType;

    @ConfigOverride
    @Parameter
    @Summary("Whether non existing queues will be created according to the fallback definition or an error will be raised if they do not exist.")
    private boolean createFallbackQueue;

    @ParameterGroup(name = "Listener Quality of Service Config", showInDsl = true)
    @Summary("Listener Quality of Service Config")
    @ConfigOverride
    @Expression(ExpressionSupport.NOT_SUPPORTED)
    private ListenerQualityOfService qualityOfService;
    private InternalAckMode resolvedAckMode;

    public void onStart(SourceCallback<InputStream, AmqpAttributes> sourceCallback) throws MuleException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Starting AMQP Message Listener");
        }
        this.resolvedAckMode = this.transactionalAction.equals(SourceTransactionalAction.ALWAYS_BEGIN) ? InternalAckMode.TRANSACTED : (InternalAckMode) AmqpCommons.resolveOverride(AmqpCommons.toInternalAckMode(this.config.getConsumerConfig().getAckMode()), AmqpCommons.toInternalAckMode(this.ackMode));
        configureConnection(sourceCallback);
        validateNumberOfConsumers(this.numberOfConsumers);
        declareTargetQueueIfNeeded(this.queueName, this.fallbackQueueDefinition, this.createFallbackQueue);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("Starting AMQP Listener with [%s] consumers on queue [%s] with AckMode [%s]", Integer.valueOf(this.numberOfConsumers), this.queueName, this.resolvedAckMode.name()));
        }
        this.multiChannelReceiverManager = new MultiChannelReceiverManager.Builder().withAckMode(this.resolvedAckMode).withConnection(this.connection).withConsumerTag(resolveConsumerTag()).withExclusiveConsumers(this.config.getConsumerConfig().isExclusiveConsumers()).withNoLocal(this.config.getConsumerConfig().isNoLocal()).withNumberOfConsumers(this.numberOfConsumers).withQueueName(this.queueName).withSourceCallback(sourceCallback).withInboundContentType(this.inboundContentType).withConfigContentType(this.config.getContentType()).withChannelManager(this.channelManager).withConfigEncoding(this.config.getEncoding()).withInboundEncoding(this.inboundEncoding).withTransactionalAction(this.transactionalAction).withQualityOfService(this.qualityOfService).withMuleContext(this.muleContext).build();
        this.multiChannelReceiverManager.start();
    }

    protected void configureConnection(final SourceCallback<InputStream, AmqpAttributes> sourceCallback) throws ConnectionException {
        this.connection = (AmqpTransactionalConnection) this.connectionProvider.connect();
        this.connection.addShutdownListener(new ShutdownListener() { // from class: com.mule.extensions.amqp.internal.listener.AmqpListener.1
            public void shutdownCompleted(ShutdownSignalException shutdownSignalException) {
                if (shutdownSignalException.isInitiatedByApplication()) {
                    return;
                }
                sourceCallback.onConnectionException(new ConnectionException(shutdownSignalException, AmqpListener.this.connection));
            }
        });
    }

    private void validateNumberOfConsumers(int i) {
        if (i < 1) {
            throw new IllegalArgumentException("Invalid number of consumers: [" + i + "]. The number should be 1 or greater.");
        }
    }

    @OnSuccess
    public void onSuccess(@ParameterGroup(name = "Response", showInDsl = true) AmqpResponseMessageBuilder amqpResponseMessageBuilder, CorrelationInfo correlationInfo, SourceCallbackContext sourceCallbackContext) {
        if (!sourceCallbackContext.getVariable(AMQP_DELIVERY_TAG).isPresent() || !sourceCallbackContext.getVariable(AMQP_CHANNEL).isPresent()) {
            LOGGER.debug("A successful was not acknowledged though an AUTO acknowledge mode was set. It was possibly previously ack'ed");
            return;
        }
        Long l = (Long) sourceCallbackContext.getVariable(AMQP_DELIVERY_TAG).get();
        Channel channel = (Channel) sourceCallbackContext.getVariable(AMQP_CHANNEL).get();
        if (this.ackMode == AckMode.AUTO) {
            try {
                channel.basicAck(l.longValue(), false);
            } catch (IOException e) {
                LOGGER.error("Error while trying to acknowledge a message.");
                return;
            }
        }
        sourceCallbackContext.getVariable(REPLY_TO_DESTINATION_VAR).ifPresent(str -> {
            doReply(amqpResponseMessageBuilder, sourceCallbackContext, str, correlationInfo, channel);
        });
    }

    @OnBackPressure
    public void onBackPressure(BackPressureContext backPressureContext, SourceCompletionCallback sourceCompletionCallback) {
        LOGGER.warn("Back pressure applied on the message source. The AMQP consumers will be temporarily suspended");
        this.multiChannelReceiverManager.suspendConsumers();
    }

    @OnTerminate
    public void onTerminate(SourceResult sourceResult) {
    }

    private void doReply(AmqpResponseMessageBuilder amqpResponseMessageBuilder, SourceCallbackContext sourceCallbackContext, String str, CorrelationInfo correlationInfo, Channel channel) {
        DefaultAmqpMessagePublisher build = DefaultAmqpMessagePublisher.Builder.newInstance().withChannel(channel).withExchangeName("").withRequestBrokerConfirms(((Boolean) AmqpCommons.resolveOverride(Boolean.valueOf(this.config.getPublisherConfig().isRequestBrokerConfirms()), Boolean.valueOf(amqpResponseMessageBuilder.isRequestBrokerConfirms()))).booleanValue()).withReturnedMessageExchange((String) AmqpCommons.resolveOverride(this.config.getPublisherConfig().getReturnedMessageExchange(), amqpResponseMessageBuilder.getReturnedMessageExchange())).build();
        amqpResponseMessageBuilder.overridePriorityIfNeeded(Integer.valueOf(this.config.getPublisherConfig().getPriority()));
        build.publish(amqpResponseMessageBuilder.build(((Boolean) AmqpCommons.resolveOverride(Boolean.valueOf(this.config.getPublisherConfig().isImmediate()), null)).booleanValue(), ((Boolean) AmqpCommons.resolveOverride(Boolean.valueOf(this.config.getPublisherConfig().isMandatory()), null)).booleanValue(), str, "", this.config.getContentType(), this.config.getEncoding(), amqpResponseMessageBuilder.getDeliveryMode(), OutboundCorrelationStrategy.AUTO, correlationInfo, this.transformationService));
    }

    @OnError
    public void onError(Error error, SourceCallbackContext sourceCallbackContext) {
        LOGGER.debug("An error ocurred when processing messages.");
        if (sourceCallbackContext.getVariable(AMQP_CHANNEL).isPresent() && sourceCallbackContext.getVariable(AMQP_DELIVERY_TAG).isPresent()) {
            Long l = (Long) sourceCallbackContext.getVariable(AMQP_DELIVERY_TAG).get();
            Channel channel = (Channel) sourceCallbackContext.getVariable(AMQP_CHANNEL).get();
            if (mustApplyRecoverStrategy(error)) {
                applyRecoverStrategy(sourceCallbackContext, channel, l);
            } else {
                rejectOnRedeliveryExhaustion(l, channel);
            }
        }
    }

    private void rejectOnRedeliveryExhaustion(Long l, Channel channel) {
        try {
            if (channel.isOpen()) {
                channel.basicReject(l.longValue(), false);
            }
        } catch (IOException e) {
            LOGGER.warn("Error while rejecting a message after exhaustion of redeliveries", e);
        }
    }

    private boolean mustApplyRecoverStrategy(Error error) {
        if (error.getErrorType() == null || error.getErrorType().getIdentifier() != null) {
            return error.getErrorType() == null || !error.getErrorType().getIdentifier().equals(Errors.ComponentIdentifiers.Handleable.REDELIVERY_EXHAUSTED.getName());
        }
        LOGGER.warn("An error type without identifier was handled: {}", error.getErrorType());
        return true;
    }

    protected void applyRecoverStrategy(SourceCallbackContext sourceCallbackContext, Channel channel, Long l) {
        switch (this.recoverStrategy) {
            case NONE:
            default:
                return;
            case NO_REQUEUE:
                recoverMessageFromChannel(sourceCallbackContext, false, channel, l);
                return;
            case REQUEUE:
                recoverMessageFromChannel(sourceCallbackContext, true, channel, l);
                return;
        }
    }

    private void recoverMessageFromChannel(SourceCallbackContext sourceCallbackContext, boolean z, Channel channel, Long l) {
        try {
            if (channel.isOpen()) {
                channel.basicReject(l.longValue(), z);
                this.channelManager.unbindChannel();
                LOGGER.debug("Applied " + this.recoverStrategy + " recover strategy on channel: " + channel);
            } else {
                LOGGER.warn("Channel closed. Cannot reject message with delivery tag {} on channel {}", l, channel);
            }
        } catch (IOException e) {
            LOGGER.error("Error while trying to recover a channel.");
        }
    }

    public void onStop() {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("Stopping AMQP Listener on queue [%s]", this.queueName));
        }
        if (this.multiChannelReceiverManager != null) {
            this.multiChannelReceiverManager.stop();
        }
        if (this.connection != null) {
            this.connectionProvider.disconnect(this.connection);
        }
    }

    private String resolveConsumerTag() {
        return this.consumerTag == null ? "" : this.consumerTag;
    }

    private void declareTargetQueueIfNeeded(String str, QueueDefinition queueDefinition, boolean z) throws ConnectionException {
        try {
            Channel createChannel = this.connection.createChannel();
            boolean queueExists = new AmqpQueueDeclarer(createChannel, queueDefinition, str).queueExists();
            if ((queueDefinition == null || !z) && !queueExists) {
                throw new ConnectionException(new AmqpQueueNotFoundException("Queue defined in listener was not found and no fallback definition declared."), this.connection);
            }
            if (queueDefinition != null && !queueExists) {
                createChannel = this.connection.createChannel();
                new AmqpQueueDeclarer(createChannel, queueDefinition, str).declareActive();
            }
            createChannel.close();
        } catch (IOException e) {
            String str2 = "A Connection error occurred while declaring the queue with name " + str + ".";
            LOGGER.error(str2, e);
            throw new ConnectionException(str2, e, (ErrorType) null, this.connection);
        } catch (TimeoutException e2) {
            String str3 = "A Timeout error occurred while declaring the queue with name " + str + ".";
            LOGGER.error(str3, e2);
            throw new ConnectionException(str3, e2, (ErrorType) null, this.connection);
        }
    }
}
