package com.mule.extensions.amqp.internal.consumer;

import com.mule.extensions.amqp.api.exception.AmqpConsumeTimeoutException;
import com.mule.extensions.amqp.internal.client.SingleMessageQueueingConsumer;
import com.mule.extensions.amqp.internal.common.AmqpCommons;
import com.mule.extensions.amqp.internal.config.InternalAckMode;
import com.mule.extensions.amqp.internal.message.AmqpMessage;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.GetResponse;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mule/extensions/amqp/internal/consumer/DefaultAmqpMessageConsumer.class */
public class DefaultAmqpMessageConsumer implements AmqpMessageConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultAmqpMessageConsumer.class);
    private Channel channel;
    private String queue;
    private long timeout;
    private InternalAckMode ackMode;
    private String consumerTag;

    public DefaultAmqpMessageConsumer(Channel channel, String str, long j, InternalAckMode internalAckMode, String str2) {
        this.channel = channel;
        this.queue = str;
        this.timeout = j;
        this.ackMode = internalAckMode;
        this.consumerTag = str2;
    }

    @Override // com.mule.extensions.amqp.internal.consumer.AmqpMessageConsumer
    public AmqpMessage consume(String str) throws IOException, InterruptedException {
        return this.timeout == 0 ? receiveNoWait() : receiveWithTimeout(this.timeout, str);
    }

    private AmqpMessage receiveNoWait() throws IOException {
        GetResponse basicGet = this.channel.basicGet(this.queue, this.ackMode.isImmediateAck());
        return new AmqpMessage(null, basicGet.getEnvelope(), AmqpCommons.getPropertiesFromBasicProperties(basicGet.getProps()), basicGet.getProps().getHeaders(), basicGet.getBody());
    }

    private AmqpMessage receiveWithTimeout(long j, String str) throws IOException, InterruptedException, AmqpConsumeTimeoutException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("Waiting for a message, timeout will be in [%s] millis", Long.valueOf(j)));
        }
        SingleMessageQueueingConsumer singleMessageQueueingConsumer = new SingleMessageQueueingConsumer(this.channel, str);
        if (this.consumerTag == null) {
            this.consumerTag = this.channel.basicConsume(this.queue, false, singleMessageQueueingConsumer);
        } else {
            this.channel.basicConsume(this.queue, false, this.consumerTag, singleMessageQueueingConsumer);
        }
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        QueueingConsumer.Delivery nextDelivery = j != -1 ? singleMessageQueueingConsumer.nextDelivery(j) : singleMessageQueueingConsumer.nextDelivery();
        stopWatch.stop();
        if (nextDelivery == null && stopWatch.getTime() >= j) {
            throw new AmqpConsumeTimeoutException(String.format("Failed to retrieve a Message on queue [%s]: operation timed out.", this.queue));
        }
        if (this.ackMode.isImmediateAck()) {
            this.channel.basicAck(nextDelivery.getEnvelope().getDeliveryTag(), false);
        }
        try {
            this.channel.basicCancel(this.consumerTag);
        } catch (Exception e) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Subscription to channel with consumerTag " + this.consumerTag + " could not be closed.", e);
            }
        }
        return new AmqpMessage(this.consumerTag, nextDelivery.getEnvelope(), AmqpCommons.getPropertiesFromBasicProperties(nextDelivery.getProperties()), nextDelivery.getProperties().getHeaders(), nextDelivery.getBody());
    }

    @Override // com.mule.extensions.amqp.internal.consumer.AmqpMessageConsumer
    public AmqpMessage consume() throws IOException, InterruptedException {
        return consume(null);
    }
}
