package org.mule.extensions.jms.internal.source;

import java.util.ArrayList;
import java.util.List;
import javax.inject.Inject;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Topic;
import org.mule.extensions.jms.api.config.AckMode;
import org.mule.extensions.jms.api.connection.JmsSpecification;
import org.mule.extensions.jms.api.destination.ConsumerType;
import org.mule.extensions.jms.api.destination.TopicConsumer;
import org.mule.extensions.jms.api.message.JmsAttributes;
import org.mule.extensions.jms.internal.common.JmsCommons;
import org.mule.extensions.jms.internal.config.InternalAckMode;
import org.mule.extensions.jms.internal.config.JmsConfig;
import org.mule.extensions.jms.internal.connection.JmsTransactionalConnection;
import org.mule.extensions.jms.internal.connection.session.JmsSession;
import org.mule.extensions.jms.internal.connection.session.JmsSessionManager;
import org.mule.extensions.jms.internal.consume.JmsMessageConsumer;
import org.mule.extensions.jms.internal.metadata.JmsOutputResolver;
import org.mule.extensions.jms.internal.support.Jms102bSupport;
import org.mule.extensions.jms.internal.support.JmsSupport;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.api.message.ErrorType;
import org.mule.runtime.core.api.util.ExceptionUtils;
import org.mule.runtime.core.api.util.StringMessageUtils;
import org.mule.runtime.extension.api.annotation.Alias;
import org.mule.runtime.extension.api.annotation.dsl.xml.ParameterDsl;
import org.mule.runtime.extension.api.annotation.execution.OnError;
import org.mule.runtime.extension.api.annotation.execution.OnSuccess;
import org.mule.runtime.extension.api.annotation.metadata.MetadataScope;
import org.mule.runtime.extension.api.annotation.param.Config;
import org.mule.runtime.extension.api.annotation.param.ConfigOverride;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.Optional;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.param.ParameterGroup;
import org.mule.runtime.extension.api.annotation.param.display.Example;
import org.mule.runtime.extension.api.annotation.source.ClusterSupport;
import org.mule.runtime.extension.api.annotation.source.EmitsResponse;
import org.mule.runtime.extension.api.annotation.source.SourceClusterSupport;
import org.mule.runtime.extension.api.runtime.parameter.CorrelationInfo;
import org.mule.runtime.extension.api.runtime.parameter.OutboundCorrelationStrategy;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.mule.runtime.extension.api.tx.SourceTransactionalAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ClusterSupport(SourceClusterSupport.DEFAULT_PRIMARY_NODE_ONLY)
@MetadataScope(outputResolver = JmsOutputResolver.class)
@Alias("listener")
@EmitsResponse
/* loaded from: input_file:org/mule/extensions/jms/internal/source/JmsListener.class */
public class JmsListener extends Source<Object, JmsAttributes> {
    private static final Logger LOGGER = LoggerFactory.getLogger(JmsListener.class);
    static final String REPLY_TO_DESTINATION_VAR = "REPLY_TO_DESTINATION";
    static final String JMS_LOCK_VAR = "JMS_LOCK";
    static final String JMS_SESSION_VAR = "JMS_SESSION";

    @Inject
    private JmsSessionManager sessionManager;

    @Config
    private JmsConfig config;

    @Connection
    private ConnectionProvider<JmsTransactionalConnection> connectionProvider;
    private JmsTransactionalConnection connection;
    private JmsSupport jmsSupport;
    private SourceTransactionalAction transactionalAction;
    private InternalAckMode resolvedAckMode;
    private final List<MessageListenerInfo> createdListeners = new ArrayList();

    @ParameterDsl(allowReferences = false)
    @Parameter
    private String destination;

    @ConfigOverride
    @Parameter
    private ConsumerType consumerType;

    @Optional
    @Parameter
    private AckMode ackMode;

    @ConfigOverride
    @Parameter
    private String selector;

    @Example(JmsCommons.EXAMPLE_CONTENT_TYPE)
    @Optional
    @Parameter
    private String inboundContentType;

    @Example(JmsCommons.EXAMPLE_ENCODING)
    @Optional
    @Parameter
    private String inboundEncoding;

    @Optional(defaultValue = "4")
    @Parameter
    private int numberOfConsumers;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/mule/extensions/jms/internal/source/JmsListener$MessageListenerInfo.class */
    public static class MessageListenerInfo {
        private JmsSession session;
        private JmsListenerLock jmsListenerLock;
        private JmsMessageConsumer messageConsumer;

        MessageListenerInfo(JmsSession jmsSession, JmsListenerLock jmsListenerLock, JmsMessageConsumer jmsMessageConsumer) {
            this.session = jmsSession;
            this.jmsListenerLock = jmsListenerLock;
            this.messageConsumer = jmsMessageConsumer;
        }

        public JmsSession getSession() {
            return this.session;
        }

        public JmsListenerLock getLock() {
            return this.jmsListenerLock;
        }

        public JmsMessageConsumer getConsumer() {
            return this.messageConsumer;
        }
    }

    static void notifyIfConnectionProblem(SourceCallbackContext sourceCallbackContext, Exception exc) {
        notifyIfConnectionProblem(sourceCallbackContext.getSourceCallback(), exc);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void notifyIfConnectionProblem(SourceCallback sourceCallback, Exception exc) {
        ExceptionUtils.extractConnectionException(exc).ifPresent(connectionException -> {
            sourceCallback.onConnectionException(connectionException);
        });
    }

    public void onStart(SourceCallback<Object, JmsAttributes> sourceCallback) throws MuleException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Starting JMS Message Listener");
        }
        this.resolvedAckMode = this.transactionalAction.equals(SourceTransactionalAction.ALWAYS_BEGIN) ? InternalAckMode.TRANSACTED : (InternalAckMode) JmsCommons.resolveOverride(JmsCommons.toInternalAckMode(this.config.getConsumerConfig().getAckMode()), JmsCommons.toInternalAckMode(this.ackMode));
        this.connection = (JmsTransactionalConnection) this.connectionProvider.connect();
        this.jmsSupport = this.connection.getJmsSupport();
        this.connection.registerExceptionListener(jMSException -> {
            sourceCallback.onConnectionException(new ConnectionException(jMSException, this.connection));
        });
        JmsMessageListenerFactory jmsMessageListenerFactory = new JmsMessageListenerFactory(this.resolvedAckMode, this.inboundEncoding, this.inboundContentType, this.config, this.sessionManager, this.jmsSupport, sourceCallback, this.connectionProvider);
        validateNumberOfConsumers(this.numberOfConsumers);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("Starting JMS Listener with [%s] consumers on destination [%s] of type [%s] with AckMode [%s]", Integer.valueOf(this.numberOfConsumers), this.destination, JmsCommons.getDestinationType(this.consumerType), this.resolvedAckMode.name()));
        }
        for (int i = 0; i < this.numberOfConsumers; i++) {
            try {
                JmsSession createSession = this.connection.createSession(this.resolvedAckMode, this.consumerType.topic());
                JmsMessageConsumer createConsumer = this.connection.createConsumer(createSession, this.jmsSupport.createDestination(createSession.get(), this.destination, this.consumerType.topic()), this.selector, this.consumerType);
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug(String.format("Creating Message Listener on Session [%s] for destination [%s]", createSession.get(), this.destination));
                }
                JmsListenerLock createJmsLock = createJmsLock();
                this.createdListeners.add(new MessageListenerInfo(createSession, createJmsLock, createConsumer));
                createConsumer.listen(jmsMessageListenerFactory.createMessageListener(createSession, createJmsLock));
            } catch (Exception e) {
                String format = String.format("An error occurred while creating the consumers for destination [%s:%s]: %s", JmsCommons.getDestinationType(this.consumerType), this.destination, e.getMessage());
                LOGGER.error(format, e);
                releaseListeners();
                throw new ConnectionException(format, e, (ErrorType) null, this.connection);
            }
        }
    }

    public void onStop() {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("Stopping JMS Listener on destination [%s:%s]", JmsCommons.getDestinationType(this.consumerType), this.destination));
        }
        releaseListeners();
        if (this.connection != null) {
            this.connectionProvider.disconnect(this.connection);
        }
    }

    @OnSuccess
    public void onSuccess(@ParameterGroup(name = "Response", showInDsl = true) JmsResponseMessageBuilder jmsResponseMessageBuilder, CorrelationInfo correlationInfo, SourceCallbackContext sourceCallbackContext) {
        sourceCallbackContext.getVariable(JMS_LOCK_VAR).ifPresent((v0) -> {
            v0.unlock();
        });
        sourceCallbackContext.getVariable(REPLY_TO_DESTINATION_VAR).ifPresent(destination -> {
            sourceCallbackContext.getVariable(JMS_SESSION_VAR).ifPresent(jmsSession -> {
                doReply(jmsResponseMessageBuilder, sourceCallbackContext, destination, correlationInfo, jmsSession);
            });
        });
    }

    @OnError
    public void onError(Error error, SourceCallbackContext sourceCallbackContext) {
        sourceCallbackContext.getVariable(JMS_LOCK_VAR).ifPresent(jmsListenerLock -> {
            if (this.resolvedAckMode.equals(InternalAckMode.AUTO) || this.resolvedAckMode.equals(InternalAckMode.DUPS_OK)) {
                jmsListenerLock.unlockWithFailure(error);
            } else {
                jmsListenerLock.unlock();
            }
        });
    }

    private void doReply(JmsResponseMessageBuilder jmsResponseMessageBuilder, SourceCallbackContext sourceCallbackContext, Destination destination, CorrelationInfo correlationInfo, JmsSession jmsSession) {
        boolean replyDestinationIsTopic = replyDestinationIsTopic(destination);
        try {
            String topicName = replyDestinationIsTopic ? ((Topic) destination).getTopicName() : ((Queue) destination).getQueueName();
            try {
                if (LOGGER.isDebugEnabled()) {
                    Logger logger = LOGGER;
                    Object[] objArr = new Object[2];
                    objArr[0] = topicName;
                    objArr[1] = replyDestinationIsTopic ? JmsCommons.TOPIC : JmsCommons.QUEUE;
                    logger.debug(String.format("Begin reply to destination [%s] of type [%s]", objArr));
                }
                Message build = jmsResponseMessageBuilder.build(this.connection.getJmsSupport(), OutboundCorrelationStrategy.AUTO, correlationInfo, jmsSession.get(), this.config);
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Message built, sending message to " + topicName);
                }
                this.connection.createProducer(this.connection.createSession(InternalAckMode.AUTO, replyDestinationIsTopic), destination, replyDestinationIsTopic).publish(build, jmsResponseMessageBuilder);
            } catch (Exception e) {
                Logger logger2 = LOGGER;
                Object[] objArr2 = new Object[3];
                objArr2[0] = topicName;
                objArr2[1] = replyDestinationIsTopic ? JmsCommons.TOPIC : JmsCommons.QUEUE;
                objArr2[2] = e.getMessage();
                logger2.error(String.format("An error occurred during reply to destination [%s] of type [%s]: %s", objArr2), e);
                notifyIfConnectionProblem(sourceCallbackContext, e);
            }
        } catch (JMSException e2) {
            LOGGER.error(String.format("An error occurred during reply. Failed to obtain the destination name: %s", e2.getMessage()));
            notifyIfConnectionProblem(sourceCallbackContext, (Exception) e2);
        }
    }

    private boolean replyDestinationIsTopic(Destination destination) {
        if ((destination instanceof Topic) && (destination instanceof Queue) && (this.jmsSupport instanceof Jms102bSupport)) {
            LOGGER.error(StringMessageUtils.getBoilerPlate("Destination implements both Queue and Topic while complying with JMS 1.0.2b specification. Please report your application server or JMS vendor name and version to http://www.mulesoft.org/jira"));
        }
        return destination instanceof Topic;
    }

    private JmsListenerLock createJmsLock() {
        if (!this.resolvedAckMode.equals(InternalAckMode.IMMEDIATE) && !this.resolvedAckMode.equals(InternalAckMode.TRANSACTED)) {
            return new DefaultJmsListenerLock();
        }
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace(String.format("Session lock skipped for ACK mode [%s].", this.resolvedAckMode.name()));
        }
        return new NullJmsListenerLock();
    }

    private void validateNumberOfConsumers(int i) {
        if (i < 1) {
            throw new IllegalArgumentException("Invalid number of consumers: [" + i + "]. The number should be 1 or greater.");
        }
        if (i > 1 && this.consumerType.topic() && !isCapableOfMultiConsumersOnTopic((TopicConsumer) this.consumerType)) {
            throw new IllegalArgumentException("Destination [" + this.destination + "] is a topic, but [" + i + "] receivers have been requested. This is only possible for 'shared' topic consumers, otherwise use 1.");
        }
    }

    private boolean isCapableOfMultiConsumersOnTopic(TopicConsumer topicConsumer) {
        return this.jmsSupport.getSpecification().equals(JmsSpecification.JMS_2_0) && topicConsumer.isShared();
    }

    private void releaseListeners() {
        try {
            this.createdListeners.forEach(messageListenerInfo -> {
                messageListenerInfo.getLock().unlockWithFailure();
                closeConsumer(messageListenerInfo.getConsumer());
                JmsCommons.closeQuietly(messageListenerInfo.getSession());
            });
        } finally {
            this.createdListeners.clear();
        }
    }

    private void closeConsumer(JmsMessageConsumer jmsMessageConsumer) {
        try {
            jmsMessageConsumer.listen(null);
        } catch (JMSException e) {
            LOGGER.error(String.format("An unexpected error occurred trying to turn off a MessageListener [%s].", jmsMessageConsumer), e);
        }
        JmsCommons.closeQuietly(jmsMessageConsumer);
    }
}
