package org.mule.jms.commons.internal.source.polling;

import java.lang.reflect.Field;
import java.util.concurrent.CountDownLatch;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import org.mule.jms.commons.api.connection.JmsSpecification;
import org.mule.jms.commons.internal.common.JmsCommons;
import org.mule.jms.commons.internal.config.InternalAckMode;
import org.mule.jms.commons.internal.config.JmsConfig;
import org.mule.jms.commons.internal.connection.XaJmsTransactionalConnection;
import org.mule.jms.commons.internal.connection.session.JmsSession;
import org.mule.jms.commons.internal.connection.session.JmsSessionManager;
import org.mule.jms.commons.internal.source.JmsConnectionExceptionResolver;
import org.mule.jms.commons.internal.source.JmsMessageDispatcher;
import org.mule.jms.commons.internal.source.NullJmsListenerLock;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.tx.TransactionException;
import org.mule.runtime.api.util.Pair;
import org.mule.runtime.core.api.transaction.TransactionCoordination;
import org.mule.runtime.extension.api.connectivity.XATransactionalConnection;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/mule/jms/commons/internal/source/polling/JmsXaMessageConsumer.class */
public class JmsXaMessageConsumer implements Runnable {
    protected static final int MAX_TRANSACTION_BINDING_RETRIES = 10;
    private static final boolean TX_COMMIT_ON_EMPTY_MESSAGE_ENABLED = Boolean.valueOf(System.getProperty("enableTxCommitOnEmptyMessage", "false")).booleanValue();
    private static final Logger LOGGER = LoggerFactory.getLogger(JmsXaMessageConsumer.class);
    private static final long POLLING_TIMEOUT = 2000;
    private final int id;
    private final MessageConsumerFactory consumer;
    private final SourceCallback sourceCallback;
    private final JmsSessionManager sessionManager;
    private final ConnectionProvider connectionProvider;
    private final JmsMessageDispatcher dispatcher;
    private CountDownLatch initializationCountDownLatch;
    private JmsSession session;
    private boolean stopRequested = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    public JmsXaMessageConsumer(MessageConsumerFactory messageConsumerFactory, SourceCallback sourceCallback, JmsSessionManager jmsSessionManager, ConnectionProvider connectionProvider, JmsConfig jmsConfig, String str, String str2, JmsSpecification jmsSpecification, int i, CountDownLatch countDownLatch, JmsConnectionExceptionResolver jmsConnectionExceptionResolver) {
        this.consumer = messageConsumerFactory;
        this.sourceCallback = sourceCallback;
        this.sessionManager = jmsSessionManager;
        this.connectionProvider = connectionProvider;
        this.id = i;
        this.initializationCountDownLatch = countDownLatch;
        this.dispatcher = new JmsMessageDispatcher(jmsConfig, str, str2, jmsSpecification, () -> {
            return this.session;
        }, InternalAckMode.TRANSACTED, jmsSessionManager, sourceCallback, new NullJmsListenerLock(), jmsConnectionExceptionResolver);
    }

    /* JADX WARN: Finally extract failed */
    @Override // java.lang.Runnable
    public synchronized void run() {
        MessageConsumer messageConsumer = null;
        try {
            try {
                boolean z = true;
                LOGGER.debug("[{}] : Starting to poll", Integer.valueOf(this.id));
                while (z && !this.stopRequested) {
                    try {
                        try {
                            Pair<SourceCallbackContext, JmsSession> initializePoll = initializePoll();
                            this.session = (JmsSession) initializePoll.getSecond();
                            messageConsumer = this.consumer.createConsumer(this.session).get();
                            z = dispatchMessage(messageConsumer.receive(POLLING_TIMEOUT), (SourceCallbackContext) initializePoll.getFirst());
                            if (z) {
                                TransactionCoordination.getInstance().rollbackCurrentTransaction();
                            }
                        } catch (Throwable th) {
                            if (z) {
                                TransactionCoordination.getInstance().rollbackCurrentTransaction();
                            }
                            throw th;
                        }
                    } catch (JMSException e) {
                        LOGGER.error("[" + this.id + "] : Unknown error when trying to poll message", e);
                        if (z) {
                            TransactionCoordination.getInstance().rollbackCurrentTransaction();
                        }
                    } catch (Exception e2) {
                        throw e2;
                    }
                }
                if (this.stopRequested) {
                    LOGGER.debug("[{}] : Stopping poll", Integer.valueOf(this.id));
                }
                LOGGER.debug("[{}] : Finishing poll", Integer.valueOf(this.id));
                this.sessionManager.unbindSession();
                JmsCommons.closeQuietly(messageConsumer);
            } catch (ConnectionException e3) {
                LOGGER.debug("[{}] : Finishing poll due to {}:{}", new Object[]{Integer.valueOf(this.id), e3.getClass(), e3.getMessage()});
                this.sourceCallback.onConnectionException(e3);
                this.sessionManager.unbindSession();
                JmsCommons.closeQuietly(null);
            } catch (Exception e4) {
                LOGGER.debug("[{}] : Finishing poll due to {}:{}", new Object[]{Integer.valueOf(this.id), e4.getClass(), e4.getMessage()});
                this.sourceCallback.onConnectionException(new ConnectionException(e4, "Unexpected error occurred trying to poll a message"));
                this.sessionManager.unbindSession();
                JmsCommons.closeQuietly(null);
            }
        } catch (Throwable th2) {
            this.sessionManager.unbindSession();
            JmsCommons.closeQuietly(null);
            throw th2;
        }
    }

    private Pair<SourceCallbackContext, JmsSession> initializePoll() throws ConnectionException, TransactionException {
        try {
            LOGGER.trace("[{}] : initializing poll ", Integer.valueOf(this.id));
            SourceCallbackContext createContext = this.sourceCallback.createContext();
            XaJmsTransactionalConnection xaJmsTransactionalConnection = (XaJmsTransactionalConnection) this.connectionProvider.connect();
            bindTransaction(createContext, xaJmsTransactionalConnection);
            try {
                Pair<SourceCallbackContext, JmsSession> pair = new Pair<>(createContext, xaJmsTransactionalConnection.getSession(InternalAckMode.TRANSACTED, false));
                if (this.initializationCountDownLatch != null) {
                    this.initializationCountDownLatch.countDown();
                    this.initializationCountDownLatch = null;
                }
                return pair;
            } catch (JMSException e) {
                throw new MuleRuntimeException(e);
            }
        } catch (Throwable th) {
            if (this.initializationCountDownLatch != null) {
                this.initializationCountDownLatch.countDown();
                this.initializationCountDownLatch = null;
            }
            throw th;
        }
    }

    private boolean dispatchMessage(Message message, SourceCallbackContext sourceCallbackContext) {
        if (this.stopRequested) {
            LOGGER.trace("[{}] : Stop has been requested, rolling back current transaction.", Integer.valueOf(this.id));
            TransactionCoordination.getInstance().rollbackCurrentTransaction();
            return true;
        }
        if (message != null) {
            LOGGER.trace("[{}] : received message, handling to the flow.", Integer.valueOf(this.id));
            sourceCallbackContext.addVariable("CONSUMER", this);
            try {
                this.dispatcher.dispatchMessage(message, sourceCallbackContext);
                return false;
            } catch (Exception e) {
                LOGGER.trace("[{}] : Message dispatch failed, rolling back transaction.", Integer.valueOf(this.id));
                TransactionCoordination.getInstance().rollbackCurrentTransaction();
                return true;
            }
        }
        if (TX_COMMIT_ON_EMPTY_MESSAGE_ENABLED) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("[{}] : No message found, committing transaction.", Integer.valueOf(this.id));
            }
            TransactionCoordination.getInstance().commitCurrentTransaction();
            return true;
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("[{}] : No message found, rolling back transaction.", Integer.valueOf(this.id));
        }
        TransactionCoordination.getInstance().rollbackCurrentTransaction();
        return true;
    }

    private void bindTransaction(SourceCallbackContext sourceCallbackContext, XATransactionalConnection xATransactionalConnection) throws ConnectionException, TransactionException {
        for (int i = 1; i <= MAX_TRANSACTION_BINDING_RETRIES; i++) {
            try {
                LOGGER.trace("[{}] : about to bind connection [{}] into context: [{}]", new Object[]{Integer.valueOf(this.id), xATransactionalConnection, sourceCallbackContext});
                sourceCallbackContext.bindConnection(this.connectionProvider.connect());
                return;
            } catch (TransactionException e) {
                LOGGER.debug("Internal error, unable to bind connection to transaction. Attempt {}/{}", new Object[]{Integer.valueOf(i), Integer.valueOf(MAX_TRANSACTION_BINDING_RETRIES), e});
                if (i == MAX_TRANSACTION_BINDING_RETRIES) {
                    throw e;
                }
                try {
                    TransactionCoordination.getInstance().rollbackCurrentTransaction();
                } catch (Exception e2) {
                    LOGGER.debug("Failure on transaction rollback", e);
                }
                try {
                    Field declaredField = sourceCallbackContext.getClass().getDeclaredField("connection");
                    declaredField.setAccessible(true);
                    declaredField.set(sourceCallbackContext, null);
                } catch (Exception e3) {
                    throw new MuleRuntimeException(e3);
                }
            }
        }
    }

    public void stop() {
        this.stopRequested = true;
    }
}
