package com.mule.extensions.amqp.internal.operation;

import com.mule.extensions.amqp.api.config.AmqpConsumerConfig;
import com.mule.extensions.amqp.api.config.ConsumerAckMode;
import com.mule.extensions.amqp.api.exception.AmqpConsumeException;
import com.mule.extensions.amqp.api.exception.AmqpConsumerErrorTypeProvider;
import com.mule.extensions.amqp.api.exception.AmqpCreationNotAllowedException;
import com.mule.extensions.amqp.api.exception.AmqpExtensionException;
import com.mule.extensions.amqp.api.exception.AmqpQueueNotFoundException;
import com.mule.extensions.amqp.api.message.AmqpAttributes;
import com.mule.extensions.amqp.api.model.QueueDefinition;
import com.mule.extensions.amqp.internal.common.AmqpCommons;
import com.mule.extensions.amqp.internal.config.AmqpConfig;
import com.mule.extensions.amqp.internal.config.InternalAckMode;
import com.mule.extensions.amqp.internal.connection.AmqpTransactionalConnection;
import com.mule.extensions.amqp.internal.connection.channel.AmqpChannelManager;
import com.mule.extensions.amqp.internal.connection.channel.MuleAmqpChannel;
import com.mule.extensions.amqp.internal.entity.AmqpQueueDeclarer;
import com.mule.extensions.amqp.internal.exception.resolver.ConsumeExceptionResolver;
import com.mule.extensions.amqp.internal.message.AmqpMessage;
import com.mule.extensions.amqp.internal.message.AmqpResultFactory;
import com.mule.extensions.amqp.internal.metadata.AmqpOutputResolver;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.mule.runtime.api.meta.ExpressionSupport;
import org.mule.runtime.extension.api.annotation.Expression;
import org.mule.runtime.extension.api.annotation.error.Throws;
import org.mule.runtime.extension.api.annotation.metadata.OutputResolver;
import org.mule.runtime.extension.api.annotation.param.Config;
import org.mule.runtime.extension.api.annotation.param.ConfigOverride;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.MediaType;
import org.mule.runtime.extension.api.annotation.param.Optional;
import org.mule.runtime.extension.api.annotation.param.display.Example;
import org.mule.runtime.extension.api.annotation.param.display.Summary;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.tx.OperationTransactionalAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mule/extensions/amqp/internal/operation/AmqpConsume.class */
public final class AmqpConsume {
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpConsume.class);

    @Inject
    private AmqpChannelManager channelManager;
    private ConsumeExceptionResolver exceptionResolver = new ConsumeExceptionResolver();
    private final AmqpResultFactory resultFactory = new AmqpResultFactory();

    @OutputResolver(output = AmqpOutputResolver.class)
    @Throws({AmqpConsumerErrorTypeProvider.class})
    @MediaType(value = "*/*", strict = false)
    public Result<InputStream, AmqpAttributes> consume(@Config AmqpConfig amqpConfig, @Connection AmqpTransactionalConnection amqpTransactionalConnection, @Summary("The name of the queue from where the Message should be consumed") String str, @Example("application/json") @Optional @Summary("The content type of the message body") String str2, @Example("UTF-8") @Optional @Summary("The encoding of the message body") String str3, @Optional @Summary("The queue definition to use for queue declaration in case there is no queue with the queueName") @Expression(ExpressionSupport.NOT_SUPPORTED) QueueDefinition queueDefinition, @Optional @Summary("The ACK mode to use when consuming a message") ConsumerAckMode consumerAckMode, @Optional @Summary("The consumer tag to use for the consumer involved in the operation") String str4, @Optional(defaultValue = "10000") @Summary("Maximum time to wait for a message to arrive before timeout") Long l, @Example("MILLISECONDS") @Optional(defaultValue = "MILLISECONDS") @Summary("Time unit to be used in the maximumWaitTime configuration") TimeUnit timeUnit, @ConfigOverride boolean z, OperationTransactionalAction operationTransactionalAction) throws AmqpExtensionException {
        InternalAckMode resolveAck = resolveAck(amqpConfig.getConsumerConfig(), consumerAckMode);
        MuleAmqpChannel muleAmqpChannel = null;
        boolean z2 = false;
        try {
            try {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Begin [consume] on : [" + str + "]");
                }
                MuleAmqpChannel createAmqpChannel = AmqpCommons.createAmqpChannel(amqpTransactionalConnection, this.channelManager, operationTransactionalAction, amqpConfig.getQualityOfService(), true);
                if (createAmqpChannel.isInTransaction()) {
                    resolveAck = InternalAckMode.TRANSACTED;
                }
                muleAmqpChannel = declareTargetQueueIfNeeded(amqpTransactionalConnection, str, queueDefinition, operationTransactionalAction, createAmqpChannel, z);
                AmqpMessage consumeMessage = consumeMessage(amqpTransactionalConnection, str, l, timeUnit, resolveAck, str4, muleAmqpChannel);
                String messageAckId = AmqpCommons.getMessageAckId(muleAmqpChannel, consumeMessage.getEnvelope().getDeliveryTag());
                String str5 = (String) AmqpCommons.resolveOverride(AmqpCommons.resolveMessageContentType(consumeMessage.getProperties().getContentType(), amqpConfig.getContentType()), str2);
                String str6 = (String) AmqpCommons.resolveOverride(AmqpCommons.resolveMessageEncoding(consumeMessage.getProperties().getContentEncoding(), amqpConfig.getEncoding()), str3);
                AmqpCommons.evaluateMessageAck(muleAmqpChannel, consumeMessage, this.channelManager, resolveAck, messageAckId);
                Result<InputStream, AmqpAttributes> createResult = this.resultFactory.createResult(consumeMessage, str5, str6, messageAckId);
                if (!resolveAck.equals(InternalAckMode.MANUAL) || 0 != 0) {
                    AmqpCommons.releaseChannelIfNeeded(muleAmqpChannel);
                }
                return createResult;
            } catch (Exception e) {
                z2 = true;
                throw new AmqpConsumeException(String.format("An error occurred while consuming a message from the queue [%s]: %s", str, e.getMessage()), (Exception) this.exceptionResolver.resolveException(e));
            }
        } catch (Throwable th) {
            if (!resolveAck.equals(InternalAckMode.MANUAL) || z2) {
                AmqpCommons.releaseChannelIfNeeded(muleAmqpChannel);
            }
            throw th;
        }
    }

    private AmqpMessage consumeMessage(AmqpTransactionalConnection amqpTransactionalConnection, String str, Long l, TimeUnit timeUnit, InternalAckMode internalAckMode, String str2, MuleAmqpChannel muleAmqpChannel) throws IOException, InterruptedException {
        return amqpTransactionalConnection.createConsumer(muleAmqpChannel, str, timeUnit.toMillis(l.longValue()), str2, internalAckMode).consume();
    }

    private MuleAmqpChannel declareTargetQueueIfNeeded(AmqpTransactionalConnection amqpTransactionalConnection, String str, QueueDefinition queueDefinition, OperationTransactionalAction operationTransactionalAction, MuleAmqpChannel muleAmqpChannel, boolean z) throws IOException {
        boolean queueExists = new AmqpQueueDeclarer(muleAmqpChannel, queueDefinition, str).queueExists();
        if (queueDefinition != null && !z && !queueExists) {
            throw new AmqpCreationNotAllowedException("Creation not allowed for queue: " + str + ". Set createFallbackQueue to true or create the queue.");
        }
        if ((queueDefinition == null || !z) && !queueExists) {
            throw new AmqpQueueNotFoundException("Queue was not found.");
        }
        if (queueDefinition != null && !queueExists) {
            muleAmqpChannel = AmqpCommons.createAmqpChannel(amqpTransactionalConnection, this.channelManager, operationTransactionalAction, true);
            new AmqpQueueDeclarer(muleAmqpChannel, queueDefinition, str).declareActive();
        }
        return muleAmqpChannel;
    }

    private InternalAckMode resolveAck(AmqpConsumerConfig amqpConsumerConfig, ConsumerAckMode consumerAckMode) {
        InternalAckMode internalAckMode = AmqpCommons.toInternalAckMode(amqpConsumerConfig.getAckMode());
        if (InternalAckMode.AUTO.equals(internalAckMode)) {
            internalAckMode = InternalAckMode.IMMEDIATE;
        }
        return (InternalAckMode) AmqpCommons.resolveOverride(internalAckMode, AmqpCommons.toInternalAckMode(consumerAckMode));
    }
}
