package com.mule.extensions.amqp.internal.connection;

import com.mule.extensions.amqp.internal.common.AmqpCommons;
import com.mule.extensions.amqp.internal.connection.channel.TransactionStatus;
import com.rabbitmq.client.BlockedListener;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ShutdownListener;
import java.io.IOException;
import java.util.Optional;
import org.mule.runtime.api.tx.TransactionException;
import org.mule.runtime.api.util.Preconditions;
import org.mule.runtime.extension.api.connectivity.TransactionalConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mule/extensions/amqp/internal/connection/AmqpTransactionalConnection.class */
public class AmqpTransactionalConnection extends AmqpConnection implements TransactionalConnection {
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpTransactionalConnection.class);
    private static final String COMMIT = "Commit";
    private static final String ROLLBACK = "Rollback";
    private boolean blockedByBroker;

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:com/mule/extensions/amqp/internal/connection/AmqpTransactionalConnection$ChannelAction.class */
    public interface ChannelAction {
        void execute(Channel channel) throws IOException;
    }

    public AmqpTransactionalConnection(Connection connection) {
        super(connection);
        this.blockedByBroker = false;
        connection.addBlockedListener(new BlockedListener() { // from class: com.mule.extensions.amqp.internal.connection.AmqpTransactionalConnection.1
            public void handleUnblocked() throws IOException {
                AmqpTransactionalConnection.this.blockedByBroker = false;
            }

            public void handleBlocked(String str) throws IOException {
                AmqpTransactionalConnection.this.blockedByBroker = true;
            }
        });
    }

    public void begin() throws TransactionException {
        getAmqpChannelManager().changeTransactionStatus(TransactionStatus.STARTED);
    }

    public void commit() throws TransactionException {
        try {
            executeTransactionAction(COMMIT, (v0) -> {
                v0.txCommit();
            }, true);
        } catch (IOException e) {
            throw new TransactionException(e);
        }
    }

    public void rollback() throws TransactionException {
        try {
            executeTransactionAction(ROLLBACK, (v0) -> {
                v0.txRollback();
            }, false);
        } catch (IOException e) {
            throw new TransactionException(e);
        }
    }

    private void executeTransactionAction(String str, ChannelAction channelAction, boolean z) throws IOException {
        Optional<Channel> transactedChannel = getAmqpChannelManager().getTransactedChannel();
        Preconditions.checkState(transactedChannel.isPresent(), "Unable to " + str + " transaction, the TX Channel doesn't exist.");
        Channel channel = transactedChannel.get();
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("AMQP Transaction " + str + " over Channel [" + channel + "]");
        }
        try {
            channelAction.execute(channel);
            if (z) {
                AmqpCommons.closeQuietly(channel);
                getAmqpChannelManager().changeTransactionStatus(TransactionStatus.NONE);
                getAmqpChannelManager().unbindChannel();
            }
        } catch (Throwable th) {
            if (z) {
                AmqpCommons.closeQuietly(channel);
                getAmqpChannelManager().changeTransactionStatus(TransactionStatus.NONE);
                getAmqpChannelManager().unbindChannel();
            }
            throw th;
        }
    }

    public void addBlockedListener(BlockedListener blockedListener) {
        this.connection.addBlockedListener(blockedListener);
    }

    public boolean isBlockedByBroker() {
        return this.blockedByBroker;
    }

    public void addShutdownListener(ShutdownListener shutdownListener) {
        this.connection.addShutdownListener(shutdownListener);
    }
}
